Allow CompactionSegmentIterator to have custom priority (#16737)

Changes:
- Break `NewestSegmentFirstIterator` into two parts
  - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator`
  but now handles a single datasource and allows a priority to be specified
  - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and
  combines the results into a single queue to be used by a compaction search policy
- Update `NewestSegmentFirstPolicy` to use the above new classes
- Cleanup `CompactionStatistics` and `AutoCompactionSnapshot`
- Cleanup `CompactSegments`
- Remove unused methods from `Tasks`
- Remove unneeded `TasksTest`
- Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest`
and `DatasourceCompactibleSegmentIteratorTest`
This commit is contained in:
Kashif Faraz 2024-07-16 07:24:49 -07:00 committed by GitHub
parent 6cf6838eb9
commit 01d67ae543
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 587 additions and 935 deletions

View File

@ -141,7 +141,7 @@ public class NewestSegmentFirstPolicyBenchmark
@Benchmark @Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole) public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{ {
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap()); final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) { for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next()); blackhole.consume(iterator.next());
} }

View File

@ -21,16 +21,9 @@ package org.apache.druid.indexing.common.task;
import org.apache.curator.shaded.com.google.common.base.Verify; import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class Tasks public class Tasks
@ -63,44 +56,19 @@ public class Tasks
* Context flag denoting if maximum possible values should be used to estimate * Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for * on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details. * more details.
* * <p>
* The value of this flag is true by default which corresponds to the old method * The value of this flag is true by default which corresponds to the old method
* of estimation. * of estimation.
*/ */
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates"; public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";
/** /**
* This context is used in compaction. When it is set in the context, the segments created by the task * Context flag to denote if segments published to metadata by a task should
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not. * have the {@code lastCompactionState} field set.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
*/ */
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
static { static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY)); Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
} }
public static SortedSet<Interval> computeCondensedIntervals(SortedSet<Interval> intervals)
{
final SortedSet<Interval> condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
List<Interval> toBeAccumulated = new ArrayList<>();
for (Interval interval : intervals) {
if (toBeAccumulated.size() == 0) {
toBeAccumulated.add(interval);
} else {
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
toBeAccumulated.add(interval);
} else {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
toBeAccumulated.clear();
toBeAccumulated.add(interval);
}
}
}
if (toBeAccumulated.size() > 0) {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
}
return condensedIntervals;
}
} }

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
public class TasksTest
{
@Test
public void testComputeCondensedIntervals()
{
final SortedSet<Interval> inputIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
for (int m = 1; m < 13; m++) {
for (int d = 1; d < 10; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
for (int d = 12; d < 20; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
inputIntervals.add(getInterval(m, 22, m, 23));
for (int d = 25; d < 28; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
if (m == 1 || m == 3 || m == 5 || m == 7 || m == 8 || m == 10) {
inputIntervals.add(getInterval(m, 31, m + 1, 1));
}
}
inputIntervals.add(Intervals.of("2017-12-31/2018-01-01"));
final SortedSet<Interval> condensedIntervals = Tasks.computeCondensedIntervals(inputIntervals);
final Iterator<Interval> condensedIntervalIterator = condensedIntervals.iterator();
Assert.assertTrue(condensedIntervalIterator.hasNext());
Interval condensedInterval = condensedIntervalIterator.next();
final SortedSet<Interval> checkedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
for (Interval inputInterval : inputIntervals) {
if (!condensedInterval.contains(inputInterval)) {
if (condensedIntervalIterator.hasNext()) {
condensedInterval = condensedIntervalIterator.next();
Assert.assertTrue(condensedInterval.contains(inputInterval));
}
}
checkedIntervals.add(inputInterval);
}
Assert.assertFalse(condensedIntervalIterator.hasNext());
Assert.assertEquals(inputIntervals, checkedIntervals);
}
private static Interval getInterval(int startMonth, int startDay, int endMonth, int endDay)
{
return Intervals.of(
StringUtils.format(
"2017-%02d-%02d/2017-%02d-%02d",
startMonth,
startDay,
endMonth,
endDay
)
);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig
@Nullable @Nullable
private final AppendableIndexSpec appendableIndexSpec; private final AppendableIndexSpec appendableIndexSpec;
public static ClientCompactionTaskQueryTuningConfig from(
DataSourceCompactionConfig compactionConfig
)
{
if (compactionConfig == null) {
return from(null, null, null);
} else {
return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null);
}
}
public static ClientCompactionTaskQueryTuningConfig from( public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig, @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment, @Nullable Integer maxRowsPerSegment,

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.Objects; import java.util.Objects;
@ -193,15 +194,9 @@ public class AutoCompactionSnapshot
private final String dataSource; private final String dataSource;
private final AutoCompactionScheduleStatus scheduleStatus; private final AutoCompactionScheduleStatus scheduleStatus;
private long bytesAwaitingCompaction; private final CompactionStatistics compactedStats = new CompactionStatistics();
private long bytesCompacted; private final CompactionStatistics skippedStats = new CompactionStatistics();
private long bytesSkipped; private final CompactionStatistics waitingStats = new CompactionStatistics();
private long segmentCountAwaitingCompaction;
private long segmentCountCompacted;
private long segmentCountSkipped;
private long intervalCountAwaitingCompaction;
private long intervalCountCompacted;
private long intervalCountSkipped;
private Builder( private Builder(
@NotNull String dataSource, @NotNull String dataSource,
@ -217,69 +212,21 @@ public class AutoCompactionSnapshot
this.dataSource = dataSource; this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus; this.scheduleStatus = scheduleStatus;
this.bytesAwaitingCompaction = 0;
this.bytesCompacted = 0;
this.bytesSkipped = 0;
this.segmentCountAwaitingCompaction = 0;
this.segmentCountCompacted = 0;
this.segmentCountSkipped = 0;
this.intervalCountAwaitingCompaction = 0;
this.intervalCountCompacted = 0;
this.intervalCountSkipped = 0;
} }
public Builder incrementBytesAwaitingCompaction(long incrementValue) public void incrementWaitingStats(CompactionStatistics entry)
{ {
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue; waitingStats.increment(entry);
return this;
} }
public Builder incrementBytesCompacted(long incrementValue) public void incrementCompactedStats(CompactionStatistics entry)
{ {
this.bytesCompacted = this.bytesCompacted + incrementValue; compactedStats.increment(entry);
return this;
} }
public Builder incrementSegmentCountAwaitingCompaction(long incrementValue) public void incrementSkippedStats(CompactionStatistics entry)
{ {
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue; skippedStats.increment(entry);
return this;
}
public Builder incrementSegmentCountCompacted(long incrementValue)
{
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
return this;
}
public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
{
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementIntervalCountCompacted(long incrementValue)
{
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
return this;
}
public Builder incrementBytesSkipped(long incrementValue)
{
this.bytesSkipped = this.bytesSkipped + incrementValue;
return this;
}
public Builder incrementSegmentCountSkipped(long incrementValue)
{
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
return this;
}
public Builder incrementIntervalCountSkipped(long incrementValue)
{
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
return this;
} }
public AutoCompactionSnapshot build() public AutoCompactionSnapshot build()
@ -287,15 +234,15 @@ public class AutoCompactionSnapshot
return new AutoCompactionSnapshot( return new AutoCompactionSnapshot(
dataSource, dataSource,
scheduleStatus, scheduleStatus,
bytesAwaitingCompaction, waitingStats.getTotalBytes(),
bytesCompacted, compactedStats.getTotalBytes(),
bytesSkipped, skippedStats.getTotalBytes(),
segmentCountAwaitingCompaction, waitingStats.getNumSegments(),
segmentCountCompacted, compactedStats.getNumSegments(),
segmentCountSkipped, skippedStats.getNumSegments(),
intervalCountAwaitingCompaction, waitingStats.getNumIntervals(),
intervalCountCompacted, compactedStats.getNumIntervals(),
intervalCountSkipped skippedStats.getNumIntervals()
); );
} }
} }

View File

@ -33,9 +33,9 @@ import java.util.Map;
public interface CompactionSegmentSearchPolicy public interface CompactionSegmentSearchPolicy
{ {
/** /**
* Reset the current states of this policy. This method should be called whenever iterating starts. * Creates an iterator that returns compactible segments.
*/ */
CompactionSegmentIterator reset( CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs, Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources, Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals Map<String, List<Interval>> skipIntervals

View File

@ -28,9 +28,13 @@ public class CompactionStatistics
private long numSegments; private long numSegments;
private long numIntervals; private long numIntervals;
public static CompactionStatistics create() public static CompactionStatistics create(long bytes, long numSegments, long numIntervals)
{ {
return new CompactionStatistics(); final CompactionStatistics stats = new CompactionStatistics();
stats.totalBytes = bytes;
stats.numIntervals = numIntervals;
stats.numSegments = numSegments;
return stats;
} }
public long getTotalBytes() public long getTotalBytes()
@ -48,10 +52,10 @@ public class CompactionStatistics
return numIntervals; return numIntervals;
} }
public void addFrom(SegmentsToCompact segments) public void increment(CompactionStatistics other)
{ {
totalBytes += segments.getTotalBytes(); totalBytes += other.getTotalBytes();
numIntervals += segments.getNumIntervals(); numIntervals += other.getNumIntervals();
numSegments += segments.size(); numSegments += other.getNumSegments();
} }
} }

View File

@ -167,12 +167,7 @@ public class CompactionStatus
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState(); this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
this.compactionConfig = compactionConfig; this.compactionConfig = compactionConfig;
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from( this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
compactionConfig.getTuningConfig(),
compactionConfig.getMaxRowsPerSegment(),
null
);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec(); this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
if (lastCompactionState == null) { if (lastCompactionState == null) {
this.existingGranularitySpec = null; this.existingGranularitySpec = null;

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
@ -59,44 +57,46 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* This class iterates all segments of the dataSources configured for compaction from the newest to the oldest. * Iterator over compactible segments of a datasource in order of specified priority.
*/ */
public class NewestSegmentFirstIterator implements CompactionSegmentIterator public class DataSourceCompactibleSegmentIterator implements Iterator<SegmentsToCompact>
{ {
private static final Logger log = new Logger(NewestSegmentFirstIterator.class); private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class);
private final String dataSource;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs; private final DataSourceCompactionConfig config;
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>(); private final CompactionStatistics compactedSegmentStats = new CompactionStatistics();
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>(); private final CompactionStatistics skippedSegmentStats = new CompactionStatistics();
private final Map<String, CompactibleSegmentIterator> timelineIterators;
// This is needed for datasource that has segmentGranularity configured // This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments // If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each // can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
// run of the compaction job and skip any interval that was already previously compacted. // run of the compaction job and skip any interval that was already previously compacted.
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>(); private final Set<Interval> compactedIntervals = new HashSet<>();
private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>( private final PriorityQueue<SegmentsToCompact> queue;
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval())
);
NewestSegmentFirstIterator( public DataSourceCompactibleSegmentIterator(
ObjectMapper objectMapper, DataSourceCompactionConfig config,
Map<String, DataSourceCompactionConfig> compactionConfigs, SegmentTimeline timeline,
Map<String, SegmentTimeline> dataSources, List<Interval> skipIntervals,
Map<String, List<Interval>> skipIntervals Comparator<SegmentsToCompact> segmentPriority,
ObjectMapper objectMapper
) )
{ {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.compactionConfigs = compactionConfigs; this.config = config;
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size()); this.dataSource = config.getDataSource();
this.queue = new PriorityQueue<>(segmentPriority);
populateQueue(timeline, skipIntervals);
}
dataSources.forEach((dataSource, timeline) -> { private void populateQueue(SegmentTimeline timeline, List<Interval> skipIntervals)
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource); {
if (timeline != null) {
Granularity configuredSegmentGranularity = null; Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) { if (!timeline.isEmpty()) {
SegmentTimeline originalTimeline = null; SegmentTimeline originalTimeline = null;
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) { if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
String temporaryVersion = DateTimes.nowUtc().toString(); String temporaryVersion = DateTimes.nowUtc().toString();
@ -154,33 +154,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
timeline, timeline,
config.getSkipOffsetFromLatest(), config.getSkipOffsetFromLatest(),
configuredSegmentGranularity, configuredSegmentGranularity,
skipIntervals.get(dataSource) skipIntervals
); );
if (!searchIntervals.isEmpty()) { if (!searchIntervals.isEmpty()) {
timelineIterators.put( findAndEnqueueSegmentsToCompact(
dataSource,
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline) new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
); );
} else {
log.warn("Skipping compaction for datasource[%s] as it has no compactible segments.", dataSource);
} }
} }
});
compactionConfigs.forEach((dataSourceName, config) -> {
if (config == null) {
throw new ISE("Unknown dataSource[%s]", dataSourceName);
} }
updateQueue(dataSourceName, config);
});
} }
@Override public CompactionStatistics totalCompactedStatistics()
public Map<String, CompactionStatistics> totalCompactedStatistics()
{ {
return compactedSegmentStats; return compactedSegmentStats;
} }
@Override public CompactionStatistics totalSkippedStatistics()
public Map<String, CompactionStatistics> totalSkippedStatistics()
{ {
return skippedSegmentStats; return skippedSegmentStats;
} }
@ -206,25 +198,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<DataSegment> resultSegments = entry.getSegments(); final List<DataSegment> resultSegments = entry.getSegments();
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty"); Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
final String dataSource = resultSegments.get(0).getDataSource();
updateQueue(dataSource, compactionConfigs.get(dataSource));
return entry; return entry;
} }
/**
* Find the next segments to compact for the given dataSource and add them to the queue.
* {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from
* the timeline of the given dataSource.
*/
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
{
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
if (!segmentsToCompact.isEmpty()) {
queue.add(segmentsToCompact);
}
}
/** /**
* Iterates compactible segments in a {@link SegmentTimeline}. * Iterates compactible segments in a {@link SegmentTimeline}.
*/ */
@ -315,27 +291,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} }
/** /**
* Finds segments to compact together for the given datasource. * Finds segments to compact together for the given datasource and adds them to
* * the priority queue.
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
*/ */
private SegmentsToCompact findSegmentsToCompact( private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compactibleSegmentIterator)
final String dataSourceName,
final DataSourceCompactionConfig config
)
{ {
final CompactibleSegmentIterator compactibleSegmentIterator
= timelineIterators.get(dataSourceName);
if (compactibleSegmentIterator == null) {
log.warn(
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
dataSourceName
);
return SegmentsToCompact.empty();
}
final long inputSegmentSize = config.getInputSegmentSizeBytes(); final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleSegmentIterator.hasNext()) { while (compactibleSegmentIterator.hasNext()) {
List<DataSegment> segments = compactibleSegmentIterator.next(); List<DataSegment> segments = compactibleSegmentIterator.next();
@ -352,47 +313,33 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (!compactionStatus.isComplete()) { if (!compactionStatus.isComplete()) {
log.debug( log.debug(
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].", "Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact() dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
); );
} }
if (compactionStatus.isComplete()) { if (compactionStatus.isComplete()) {
addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates); compactedSegmentStats.increment(candidates.getStats());
} else if (candidates.getTotalBytes() > inputSegmentSize) { } else if (candidates.getTotalBytes() > inputSegmentSize) {
addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates); skippedSegmentStats.increment(candidates.getStats());
log.warn( log.warn(
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]" "Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
+ " is larger than allowed inputSegmentSize[%d].", + " is larger than allowed inputSegmentSize[%d].",
dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize dataSource, interval, candidates.getTotalBytes(), inputSegmentSize
); );
} else if (config.getGranularitySpec() != null } else if (config.getGranularitySpec() != null
&& config.getGranularitySpec().getSegmentGranularity() != null) { && config.getGranularitySpec().getSegmentGranularity() != null) {
Set<Interval> compactedIntervals = intervalCompactedForDatasource
.computeIfAbsent(dataSourceName, k -> new HashSet<>());
if (compactedIntervals.contains(interval)) { if (compactedIntervals.contains(interval)) {
// Skip these candidate segments as we have already compacted this interval // Skip these candidate segments as we have already compacted this interval
} else { } else {
compactedIntervals.add(interval); compactedIntervals.add(interval);
return candidates; queue.add(candidates);
} }
} else { } else {
return candidates; queue.add(candidates);
} }
} }
log.debug("No more segments to compact for datasource[%s].", dataSourceName); log.debug("No more segments to compact for datasource[%s].", dataSource);
return SegmentsToCompact.empty();
}
private void addSegmentStatsTo(
Map<String, CompactionStatistics> statisticsMap,
String dataSourceName,
SegmentsToCompact segments
)
{
statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create())
.addFrom(segments);
} }
/** /**
@ -428,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<DataSegment> segments = new ArrayList<>( final List<DataSegment> segments = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE) timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
); );
addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments)); skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats());
} }
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd()); final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -29,7 +30,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* This policy searches segments for compaction from the newest one to oldest one. * This policy searches segments for compaction from newest to oldest.
*/ */
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{ {
@ -42,12 +43,20 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
} }
@Override @Override
public CompactionSegmentIterator reset( public CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs, Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources, Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals Map<String, List<Interval>> skipIntervals
) )
{ {
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals); return new PriorityBasedCompactionSegmentIterator(
compactionConfigs,
dataSources,
skipIntervals,
(o1, o2) -> Comparators.intervalsByStartThenEnd()
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
objectMapper
);
} }
} }

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
/**
* Implementation of {@link CompactionSegmentIterator} that returns segments in
* order of their priority.
*/
public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator
{
private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class);
private final PriorityQueue<SegmentsToCompact> queue;
private final Map<String, DataSourceCompactibleSegmentIterator> datasourceIterators;
public PriorityBasedCompactionSegmentIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> datasourceToTimeline,
Map<String, List<Interval>> skipIntervals,
Comparator<SegmentsToCompact> segmentPriority,
ObjectMapper objectMapper
)
{
this.queue = new PriorityQueue<>(segmentPriority);
this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
compactionConfigs.forEach((datasource, config) -> {
if (config == null) {
throw DruidException.defensive("Invalid null compaction config for dataSource[%s].", datasource);
}
final SegmentTimeline timeline = datasourceToTimeline.get(datasource);
if (timeline == null) {
log.warn("Skipping compaction for datasource[%s] as it has no timeline.", datasource);
return;
}
datasourceIterators.put(
datasource,
new DataSourceCompactibleSegmentIterator(
compactionConfigs.get(datasource),
timeline,
skipIntervals.getOrDefault(datasource, Collections.emptyList()),
segmentPriority,
objectMapper
)
);
addNextItemForDatasourceToQueue(datasource);
});
}
@Override
public Map<String, CompactionStatistics> totalCompactedStatistics()
{
return CollectionUtils.mapValues(
datasourceIterators,
DataSourceCompactibleSegmentIterator::totalCompactedStatistics
);
}
@Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
{
return CollectionUtils.mapValues(
datasourceIterators,
DataSourceCompactibleSegmentIterator::totalSkippedStatistics
);
}
@Override
public boolean hasNext()
{
return !queue.isEmpty();
}
@Override
public SegmentsToCompact next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final SegmentsToCompact entry = queue.poll();
if (entry == null) {
throw new NoSuchElementException();
}
Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty");
addNextItemForDatasourceToQueue(entry.getFirst().getDataSource());
return entry;
}
private void addNextItemForDatasourceToQueue(String dataSourceName)
{
final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName);
if (iterator.hasNext()) {
final SegmentsToCompact segmentsToCompact = iterator.next();
if (!segmentsToCompact.isEmpty()) {
queue.add(segmentsToCompact);
}
}
}
}

View File

@ -107,9 +107,9 @@ public class SegmentsToCompact
return umbrellaInterval; return umbrellaInterval;
} }
public long getNumIntervals() public CompactionStatistics getStats()
{ {
return numIntervals; return CompactionStatistics.create(totalBytes, size(), numIntervals);
} }
@Override @Override

View File

@ -54,7 +54,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator; import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import org.apache.druid.server.coordinator.compact.SegmentsToCompact; import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.Dimension;
@ -87,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Logger LOG = new Logger(CompactSegments.class); private static final Logger LOG = new Logger(CompactSegments.class);
private static final String TASK_ID_PREFIX = "coordinator-issued";
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK = private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType()); status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
@ -196,7 +196,7 @@ public class CompactSegments implements CoordinatorCustomDuty
// Get iterator over segments to compact and submit compaction tasks // Get iterator over segments to compact and submit compaction tasks
Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource(); Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
final CompactionSegmentIterator iterator = final CompactionSegmentIterator iterator =
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction); policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction);
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig); final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
final int availableCompactionTaskSlots final int availableCompactionTaskSlots
@ -215,7 +215,7 @@ public class CompactSegments implements CoordinatorCustomDuty
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity); stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots); stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks); stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats); updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
return params; return params;
} }
@ -392,28 +392,19 @@ public class CompactSegments implements CoordinatorCustomDuty
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) { while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
final SegmentsToCompact entry = iterator.next(); final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments(); if (entry.isEmpty()) {
if (segmentsToCompact.isEmpty()) {
throw new ISE("segmentsToCompact is empty?"); throw new ISE("segmentsToCompact is empty?");
} }
final String dataSourceName = segmentsToCompact.get(0).getDataSource(); final String dataSourceName = entry.getFirst().getDataSource();
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( currentRunAutoCompactionSnapshotBuilders
dataSourceName, .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
AutoCompactionSnapshot::builder .incrementCompactedStats(entry.getStats());
);
snapshotBuilder
.incrementBytesCompacted(
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
)
.incrementIntervalCountCompacted(
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
)
.incrementSegmentCountCompacted(segmentsToCompact.size());
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
final List<DataSegment> segmentsToCompact = entry.getSegments();
// Create granularitySpec to send to compaction task // Create granularitySpec to send to compaction task
ClientCompactionTaskGranularitySpec granularitySpec; ClientCompactionTaskGranularitySpec granularitySpec;
@ -514,7 +505,6 @@ public class CompactSegments implements CoordinatorCustomDuty
} }
final String taskId = compactSegments( final String taskId = compactSegments(
"coordinator-issued",
segmentsToCompact, segmentsToCompact,
config.getTaskPriority(), config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from( ClientCompactionTaskQueryTuningConfig.from(
@ -536,7 +526,6 @@ public class CompactSegments implements CoordinatorCustomDuty
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval() taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
); );
LOG.debugSegments(segmentsToCompact, "Compacting segments"); LOG.debugSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
numSubmittedTasks++; numSubmittedTasks++;
totalTaskSlotsAssigned += slotsRequiredForCurrentTask; totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
} }
@ -554,7 +543,7 @@ public class CompactSegments implements CoordinatorCustomDuty
return newContext; return newContext;
} }
private void addCompactionSnapshotStats( private void updateCompactionSnapshotStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
CompactionSegmentIterator iterator, CompactionSegmentIterator iterator,
CoordinatorRunStats stats CoordinatorRunStats stats
@ -563,77 +552,45 @@ public class CompactSegments implements CoordinatorCustomDuty
// Mark all the segments remaining in the iterator as "awaiting compaction" // Mark all the segments remaining in the iterator as "awaiting compaction"
while (iterator.hasNext()) { while (iterator.hasNext()) {
final SegmentsToCompact entry = iterator.next(); final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments(); if (!entry.isEmpty()) {
if (!segmentsToCompact.isEmpty()) { final String dataSourceName = entry.getFirst().getDataSource();
final String dataSourceName = segmentsToCompact.get(0).getDataSource(); currentRunAutoCompactionSnapshotBuilders
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
dataSourceName, .incrementWaitingStats(entry.getStats());
AutoCompactionSnapshot::builder
);
snapshotBuilder
.incrementBytesAwaitingCompaction(
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
)
.incrementIntervalCountAwaitingCompaction(
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
)
.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
} }
} }
// Statistics of all segments considered compacted after this run // Statistics of all segments considered compacted after this run
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics(); iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> {
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) { currentRunAutoCompactionSnapshotBuilders
final String dataSource = compactionStatisticsEntry.getKey(); .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue(); .incrementCompactedStats(compactedStats);
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( });
dataSource,
AutoCompactionSnapshot::builder
);
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
}
// Statistics of all segments considered skipped after this run // Statistics of all segments considered skipped after this run
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics(); iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> {
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) { currentRunAutoCompactionSnapshotBuilders
final String dataSource = compactionStatisticsEntry.getKey(); .computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue(); .incrementSkippedStats(dataSourceSkippedStatistics);
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( });
dataSource,
AutoCompactionSnapshot::builder
);
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes())
.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments())
.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
}
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> {
: currentRunAutoCompactionSnapshotBuilders.entrySet()) { final AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
// Build the complete snapshot for the datasource
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot); currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
collectSnapshotStats(autoCompactionSnapshot, stats);
// Use the complete snapshot to emit metrics });
addStatsForDatasource(dataSource, autoCompactionSnapshot, stats);
}
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run // Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource); autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
} }
private void addStatsForDatasource( private void collectSnapshotStats(
String dataSource,
AutoCompactionSnapshot autoCompactionSnapshot, AutoCompactionSnapshot autoCompactionSnapshot,
CoordinatorRunStats stats CoordinatorRunStats stats
) )
{ {
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource); final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, autoCompactionSnapshot.getDataSource());
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction()); stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction()); stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
@ -668,7 +625,6 @@ public class CompactSegments implements CoordinatorCustomDuty
} }
private String compactSegments( private String compactSegments(
String idPrefix,
List<DataSegment> segments, List<DataSegment> segments,
int compactionTaskPriority, int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@ -692,7 +648,7 @@ public class CompactSegments implements CoordinatorCustomDuty
context = context == null ? new HashMap<>() : context; context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority); context.put("priority", compactionTaskPriority);
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null); final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null);
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity(); final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery( final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
taskId, taskId,

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator; package org.apache.druid.server.coordinator;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -30,17 +31,11 @@ public class AutoCompactionSnapshotTest
final String expectedDataSource = "data"; final String expectedDataSource = "data";
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
// Increment every stats twice // Increment every stat twice
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
builder.incrementIntervalCountSkipped(13) builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
.incrementBytesSkipped(13) builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
.incrementSegmentCountSkipped(13) builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
.incrementIntervalCountCompacted(13)
.incrementBytesCompacted(13)
.incrementSegmentCountCompacted(13)
.incrementIntervalCountAwaitingCompaction(13)
.incrementBytesAwaitingCompaction(13)
.incrementSegmentCountAwaitingCompaction(13);
} }
final AutoCompactionSnapshot actual = builder.build(); final AutoCompactionSnapshot actual = builder.build();

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class CompactionStatusTest
{
private static final String DS_WIKI = "wiki";
@Test
public void testFindPartitionsSpecWhenGivenIsNull()
{
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(null);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithNullMaxTotalRows()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxTotalRows()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, 1000L);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxRowsPerSegment()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(100, 1000L);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config)
)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsHashed()
{
final PartitionsSpec partitionsSpec =
new HashedPartitionsSpec(null, 100, Collections.singletonList("dim"));
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsRange()
{
final PartitionsSpec partitionsSpec =
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
private static DataSourceCompactionConfig createCompactionConfig(
PartitionsSpec partitionsSpec
)
{
return new DataSourceCompactionConfig(
DS_WIKI,
null, null, null, null, createTuningConfig(partitionsSpec),
null, null, null, null, null, null, null
);
}
private static UserCompactionTaskQueryTuningConfig createTuningConfig(
PartitionsSpec partitionsSpec
)
{
return new UserCompactionTaskQueryTuningConfig(
null,
null, null, null, null, partitionsSpec, null, null, null,
null, null, null, null, null, null, null, null, null, null
);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class DataSourceCompactibleSegmentIteratorTest
{
@Test
public void testFilterSkipIntervals()
{
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
final List<Interval> expectedSkipIntervals = ImmutableList.of(
Intervals.of("2018-01-15/2018-03-02"),
Intervals.of("2018-07-23/2018-10-01"),
Intervals.of("2018-10-02/2018-12-25"),
Intervals.of("2018-12-31/2019-01-01")
);
final List<Interval> skipIntervals = DataSourceCompactibleSegmentIterator.filterSkipIntervals(
totalInterval,
Lists.newArrayList(
Intervals.of("2017-12-01/2018-01-15"),
Intervals.of("2018-03-02/2018-07-23"),
Intervals.of("2018-10-01/2018-10-02"),
Intervals.of("2018-12-25/2018-12-31")
)
);
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
}
@Test
public void testAddSkipIntervalFromLatestAndSort()
{
final List<Interval> expectedIntervals = ImmutableList.of(
Intervals.of("2018-12-24/2018-12-25"),
Intervals.of("2018-12-29/2019-01-01")
);
final List<Interval> fullSkipIntervals = DataSourceCompactibleSegmentIterator.sortAndAddSkipIntervalFromLatest(
DateTimes.of("2019-01-01"),
new Period(72, 0, 0, 0),
null,
ImmutableList.of(
Intervals.of("2018-12-30/2018-12-31"),
Intervals.of("2018-12-24/2018-12-25")
)
);
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
}
}

View File

@ -1,477 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class NewestSegmentFirstIteratorTest
{
@Test
public void testFilterSkipIntervals()
{
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
final List<Interval> expectedSkipIntervals = ImmutableList.of(
Intervals.of("2018-01-15/2018-03-02"),
Intervals.of("2018-07-23/2018-10-01"),
Intervals.of("2018-10-02/2018-12-25"),
Intervals.of("2018-12-31/2019-01-01")
);
final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
totalInterval,
Lists.newArrayList(
Intervals.of("2017-12-01/2018-01-15"),
Intervals.of("2018-03-02/2018-07-23"),
Intervals.of("2018-10-01/2018-10-02"),
Intervals.of("2018-12-25/2018-12-31")
)
);
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
}
@Test
public void testAddSkipIntervalFromLatestAndSort()
{
final List<Interval> expectedIntervals = ImmutableList.of(
Intervals.of("2018-12-24/2018-12-25"),
Intervals.of("2018-12-29/2019-01-01")
);
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
DateTimes.of("2019-01-01"),
new Period(72, 0, 0, 0),
null,
ImmutableList.of(
Intervals.of("2018-12-30/2018-12-31"),
Intervals.of("2018-12-24/2018-12-25")
)
);
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
}
@Test
public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPartitinosSpecWithMaxTotalRowsOfLongMax()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, 1000L),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGivenValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(100, 1000L),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPartitionsSpecIgnoreDeprecatedOne()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartitionsSpecIgnoreDeprecatedOne()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
}

View File

@ -88,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
public void testLargeOffsetAndSmallSegmentInterval() public void testLargeOffsetAndSmallSegmentInterval()
{ {
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -113,7 +113,7 @@ public class NewestSegmentFirstPolicyTest
public void testSmallOffsetAndLargeSegmentInterval() public void testSmallOffsetAndLargeSegmentInterval()
{ {
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -146,7 +146,7 @@ public class NewestSegmentFirstPolicyTest
public void testLargeGapInData() public void testLargeGapInData()
{ {
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -179,7 +179,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testHugeShard() public void testHugeShard()
{ {
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -229,7 +229,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testManySegmentsPerShard() public void testManySegmentsPerShard()
{ {
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -287,7 +287,7 @@ public class NewestSegmentFirstPolicyTest
{ {
final String unknownDataSource = "unknown"; final String unknownDataSource = "unknown";
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of( ImmutableMap.of(
unknownDataSource, unknownDataSource,
createCompactionConfig(10000, new Period("P2D"), null), createCompactionConfig(10000, new Period("P2D"), null),
@ -337,7 +337,7 @@ public class NewestSegmentFirstPolicyTest
2 2
) )
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -374,7 +374,7 @@ public class NewestSegmentFirstPolicyTest
) )
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
) )
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -412,7 +412,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D")) new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -445,7 +445,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -471,7 +471,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H")) new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -496,7 +496,7 @@ public class NewestSegmentFirstPolicyTest
public void testWithSkipIntervals() public void testWithSkipIntervals()
{ {
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -536,7 +536,7 @@ public class NewestSegmentFirstPolicyTest
public void testHoleInSearchInterval() public void testHoleInSearchInterval()
{ {
final Period segmentPeriod = new Period("PT1H"); final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
ImmutableMap.of( ImmutableMap.of(
DATA_SOURCE, DATA_SOURCE,
@ -586,7 +586,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D")) new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -635,7 +635,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D")) new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D")) new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -696,7 +696,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -740,7 +740,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets segmentGranularity=DAY // Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -773,7 +773,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets segmentGranularity=DAY // Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -787,7 +787,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -806,7 +806,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets segmentGranularity=YEAR // Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -830,7 +830,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -849,7 +849,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets segmentGranularity=YEAR // Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -873,7 +873,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -887,7 +887,7 @@ public class NewestSegmentFirstPolicyTest
// Duration of new segmentGranularity is the same as before (P1D), // Duration of new segmentGranularity is the same as before (P1D),
// but we changed the timezone from UTC to Bangkok in the auto compaction spec // but we changed the timezone from UTC to Bangkok in the auto compaction spec
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig( createCompactionConfig(
130000, 130000,
@ -925,7 +925,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -938,7 +938,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec // Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig( createCompactionConfig(
130000, 130000,
@ -976,7 +976,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have // Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1004,7 +1004,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets rollup=true // Auto compaction config sets rollup=true
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -1036,7 +1036,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have // Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1064,7 +1064,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets queryGranularity=MINUTE // Auto compaction config sets queryGranularity=MINUTE
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -1096,7 +1096,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have // Create segments that were compacted (CompactionState != null) and have
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1131,7 +1131,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets Dimensions=["foo"] // Auto compaction config sets Dimensions=["foo"]
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1172,7 +1172,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext()); Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets Dimensions=null // Auto compaction config sets Dimensions=null
iterator = policy.reset( iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1195,7 +1195,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have // Create segments that were compacted (CompactionState != null) and have
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1251,7 +1251,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null) // Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null)
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1292,7 +1292,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext()); Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets filter=null // Auto compaction config sets filter=null
iterator = policy.reset( iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1319,7 +1319,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config // Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper); Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config // Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have // Create segments that were compacted (CompactionState != null) and have
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00, // metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1375,7 +1375,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")} // Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")}
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1416,7 +1416,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext()); Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets metricsSpec=null // Auto compaction config sets metricsSpec=null
iterator = policy.reset( iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1440,7 +1440,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null) new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
); );
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))), ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline), ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap() Collections.emptyMap()
@ -1468,7 +1468,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config // Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build(); IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {}); Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
@ -1481,7 +1481,7 @@ public class NewestSegmentFirstPolicyTest
); );
// Duration of new segmentGranularity is the same as before (P1D) // Duration of new segmentGranularity is the same as before (P1D)
final CompactionSegmentIterator iterator = policy.reset( final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig( createCompactionConfig(
130000, 130000,
@ -1517,7 +1517,7 @@ public class NewestSegmentFirstPolicyTest
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
{ {
NullHandling.initializeForTests(); NullHandling.initializeForTests();
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null)); PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
final SegmentTimeline timeline = createTimeline( final SegmentTimeline timeline = createTimeline(
new SegmentGenerateSpec( new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@ -1534,7 +1534,7 @@ public class NewestSegmentFirstPolicyTest
) )
); );
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1569,7 +1569,7 @@ public class NewestSegmentFirstPolicyTest
); );
Assert.assertFalse(iterator.hasNext()); Assert.assertFalse(iterator.hasNext());
iterator = policy.reset( iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig( ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000, 130000,
new Period("P0D"), new Period("P0D"),
@ -1608,7 +1608,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testSkipAllGranularityToDefault() public void testSkipAllGranularityToDefault()
{ {
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1640,7 +1640,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testSkipFirstHalfEternityToDefault() public void testSkipFirstHalfEternityToDefault()
{ {
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1672,7 +1672,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testSkipSecondHalfOfEternityToDefault() public void testSkipSecondHalfOfEternityToDefault()
{ {
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1704,7 +1704,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testSkipAllToAllGranularity() public void testSkipAllToAllGranularity()
{ {
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1736,7 +1736,7 @@ public class NewestSegmentFirstPolicyTest
@Test @Test
public void testSkipAllToFinerGranularity() public void testSkipAllToFinerGranularity()
{ {
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1799,7 +1799,7 @@ public class NewestSegmentFirstPolicyTest
0, 0,
1); 1);
CompactionSegmentIterator iterator = policy.reset( CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),
@ -1850,7 +1850,7 @@ public class NewestSegmentFirstPolicyTest
TombstoneShardSpec.INSTANCE, TombstoneShardSpec.INSTANCE,
0, 0,
1); 1);
iterator = policy.reset( iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000, createCompactionConfig(10000,
new Period("P0D"), new Period("P0D"),