Allow CompactionSegmentIterator to have custom priority (#16737)

Changes:
- Break `NewestSegmentFirstIterator` into two parts
  - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator`
  but now handles a single datasource and allows a priority to be specified
  - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and
  combines the results into a single queue to be used by a compaction search policy
- Update `NewestSegmentFirstPolicy` to use the above new classes
- Cleanup `CompactionStatistics` and `AutoCompactionSnapshot`
- Cleanup `CompactSegments`
- Remove unused methods from `Tasks`
- Remove unneeded `TasksTest`
- Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest`
and `DatasourceCompactibleSegmentIteratorTest`
This commit is contained in:
Kashif Faraz 2024-07-16 07:24:49 -07:00 committed by GitHub
parent 6cf6838eb9
commit 01d67ae543
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 587 additions and 935 deletions

View File

@ -141,7 +141,7 @@ public class NewestSegmentFirstPolicyBenchmark
@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}

View File

@ -21,16 +21,9 @@ package org.apache.druid.indexing.common.task;
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
public class Tasks
@ -63,44 +56,19 @@ public class Tasks
* Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details.
*
* <p>
* The value of this flag is true by default which corresponds to the old method
* of estimation.
*/
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";
/**
* This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
* Context flag to denote if segments published to metadata by a task should
* have the {@code lastCompactionState} field set.
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
static {
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
}
public static SortedSet<Interval> computeCondensedIntervals(SortedSet<Interval> intervals)
{
final SortedSet<Interval> condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
List<Interval> toBeAccumulated = new ArrayList<>();
for (Interval interval : intervals) {
if (toBeAccumulated.size() == 0) {
toBeAccumulated.add(interval);
} else {
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
toBeAccumulated.add(interval);
} else {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
toBeAccumulated.clear();
toBeAccumulated.add(interval);
}
}
}
if (toBeAccumulated.size() > 0) {
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
}
return condensedIntervals;
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
public class TasksTest
{
@Test
public void testComputeCondensedIntervals()
{
final SortedSet<Interval> inputIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
for (int m = 1; m < 13; m++) {
for (int d = 1; d < 10; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
for (int d = 12; d < 20; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
inputIntervals.add(getInterval(m, 22, m, 23));
for (int d = 25; d < 28; d++) {
inputIntervals.add(getInterval(m, d, m, d + 1));
}
if (m == 1 || m == 3 || m == 5 || m == 7 || m == 8 || m == 10) {
inputIntervals.add(getInterval(m, 31, m + 1, 1));
}
}
inputIntervals.add(Intervals.of("2017-12-31/2018-01-01"));
final SortedSet<Interval> condensedIntervals = Tasks.computeCondensedIntervals(inputIntervals);
final Iterator<Interval> condensedIntervalIterator = condensedIntervals.iterator();
Assert.assertTrue(condensedIntervalIterator.hasNext());
Interval condensedInterval = condensedIntervalIterator.next();
final SortedSet<Interval> checkedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
for (Interval inputInterval : inputIntervals) {
if (!condensedInterval.contains(inputInterval)) {
if (condensedIntervalIterator.hasNext()) {
condensedInterval = condensedIntervalIterator.next();
Assert.assertTrue(condensedInterval.contains(inputInterval));
}
}
checkedIntervals.add(inputInterval);
}
Assert.assertFalse(condensedIntervalIterator.hasNext());
Assert.assertEquals(inputIntervals, checkedIntervals);
}
private static Interval getInterval(int startMonth, int startDay, int endMonth, int endDay)
{
return Intervals.of(
StringUtils.format(
"2017-%02d-%02d/2017-%02d-%02d",
startMonth,
startDay,
endMonth,
endDay
)
);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig
@Nullable
private final AppendableIndexSpec appendableIndexSpec;
public static ClientCompactionTaskQueryTuningConfig from(
DataSourceCompactionConfig compactionConfig
)
{
if (compactionConfig == null) {
return from(null, null, null);
} else {
return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null);
}
}
public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment,

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import javax.validation.constraints.NotNull;
import java.util.Objects;
@ -193,15 +194,9 @@ public class AutoCompactionSnapshot
private final String dataSource;
private final AutoCompactionScheduleStatus scheduleStatus;
private long bytesAwaitingCompaction;
private long bytesCompacted;
private long bytesSkipped;
private long segmentCountAwaitingCompaction;
private long segmentCountCompacted;
private long segmentCountSkipped;
private long intervalCountAwaitingCompaction;
private long intervalCountCompacted;
private long intervalCountSkipped;
private final CompactionStatistics compactedStats = new CompactionStatistics();
private final CompactionStatistics skippedStats = new CompactionStatistics();
private final CompactionStatistics waitingStats = new CompactionStatistics();
private Builder(
@NotNull String dataSource,
@ -217,69 +212,21 @@ public class AutoCompactionSnapshot
this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.bytesAwaitingCompaction = 0;
this.bytesCompacted = 0;
this.bytesSkipped = 0;
this.segmentCountAwaitingCompaction = 0;
this.segmentCountCompacted = 0;
this.segmentCountSkipped = 0;
this.intervalCountAwaitingCompaction = 0;
this.intervalCountCompacted = 0;
this.intervalCountSkipped = 0;
}
public Builder incrementBytesAwaitingCompaction(long incrementValue)
public void incrementWaitingStats(CompactionStatistics entry)
{
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
return this;
waitingStats.increment(entry);
}
public Builder incrementBytesCompacted(long incrementValue)
public void incrementCompactedStats(CompactionStatistics entry)
{
this.bytesCompacted = this.bytesCompacted + incrementValue;
return this;
compactedStats.increment(entry);
}
public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
public void incrementSkippedStats(CompactionStatistics entry)
{
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementSegmentCountCompacted(long incrementValue)
{
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
return this;
}
public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
{
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementIntervalCountCompacted(long incrementValue)
{
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
return this;
}
public Builder incrementBytesSkipped(long incrementValue)
{
this.bytesSkipped = this.bytesSkipped + incrementValue;
return this;
}
public Builder incrementSegmentCountSkipped(long incrementValue)
{
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
return this;
}
public Builder incrementIntervalCountSkipped(long incrementValue)
{
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
return this;
skippedStats.increment(entry);
}
public AutoCompactionSnapshot build()
@ -287,15 +234,15 @@ public class AutoCompactionSnapshot
return new AutoCompactionSnapshot(
dataSource,
scheduleStatus,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
segmentCountAwaitingCompaction,
segmentCountCompacted,
segmentCountSkipped,
intervalCountAwaitingCompaction,
intervalCountCompacted,
intervalCountSkipped
waitingStats.getTotalBytes(),
compactedStats.getTotalBytes(),
skippedStats.getTotalBytes(),
waitingStats.getNumSegments(),
compactedStats.getNumSegments(),
skippedStats.getNumSegments(),
waitingStats.getNumIntervals(),
compactedStats.getNumIntervals(),
skippedStats.getNumIntervals()
);
}
}

View File

@ -33,9 +33,9 @@ import java.util.Map;
public interface CompactionSegmentSearchPolicy
{
/**
* Reset the current states of this policy. This method should be called whenever iterating starts.
* Creates an iterator that returns compactible segments.
*/
CompactionSegmentIterator reset(
CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals

View File

@ -28,9 +28,13 @@ public class CompactionStatistics
private long numSegments;
private long numIntervals;
public static CompactionStatistics create()
public static CompactionStatistics create(long bytes, long numSegments, long numIntervals)
{
return new CompactionStatistics();
final CompactionStatistics stats = new CompactionStatistics();
stats.totalBytes = bytes;
stats.numIntervals = numIntervals;
stats.numSegments = numSegments;
return stats;
}
public long getTotalBytes()
@ -48,10 +52,10 @@ public class CompactionStatistics
return numIntervals;
}
public void addFrom(SegmentsToCompact segments)
public void increment(CompactionStatistics other)
{
totalBytes += segments.getTotalBytes();
numIntervals += segments.getNumIntervals();
numSegments += segments.size();
totalBytes += other.getTotalBytes();
numIntervals += other.getNumIntervals();
numSegments += other.getNumSegments();
}
}

View File

@ -167,12 +167,7 @@ public class CompactionStatus
this.objectMapper = objectMapper;
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
this.compactionConfig = compactionConfig;
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
compactionConfig.getTuningConfig(),
compactionConfig.getMaxRowsPerSegment(),
null
);
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
if (lastCompactionState == null) {
this.existingGranularitySpec = null;

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -59,44 +57,46 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* This class iterates all segments of the dataSources configured for compaction from the newest to the oldest.
* Iterator over compactible segments of a datasource in order of specified priority.
*/
public class NewestSegmentFirstIterator implements CompactionSegmentIterator
public class DataSourceCompactibleSegmentIterator implements Iterator<SegmentsToCompact>
{
private static final Logger log = new Logger(NewestSegmentFirstIterator.class);
private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class);
private final String dataSource;
private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>();
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>();
private final Map<String, CompactibleSegmentIterator> timelineIterators;
private final DataSourceCompactionConfig config;
private final CompactionStatistics compactedSegmentStats = new CompactionStatistics();
private final CompactionStatistics skippedSegmentStats = new CompactionStatistics();
// This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
// run of the compaction job and skip any interval that was already previously compacted.
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>();
private final Set<Interval> compactedIntervals = new HashSet<>();
private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval())
);
private final PriorityQueue<SegmentsToCompact> queue;
NewestSegmentFirstIterator(
ObjectMapper objectMapper,
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
public DataSourceCompactibleSegmentIterator(
DataSourceCompactionConfig config,
SegmentTimeline timeline,
List<Interval> skipIntervals,
Comparator<SegmentsToCompact> segmentPriority,
ObjectMapper objectMapper
)
{
this.objectMapper = objectMapper;
this.compactionConfigs = compactionConfigs;
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
this.config = config;
this.dataSource = config.getDataSource();
this.queue = new PriorityQueue<>(segmentPriority);
populateQueue(timeline, skipIntervals);
}
dataSources.forEach((dataSource, timeline) -> {
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
private void populateQueue(SegmentTimeline timeline, List<Interval> skipIntervals)
{
if (timeline != null) {
Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
if (!timeline.isEmpty()) {
SegmentTimeline originalTimeline = null;
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
String temporaryVersion = DateTimes.nowUtc().toString();
@ -154,33 +154,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
timeline,
config.getSkipOffsetFromLatest(),
configuredSegmentGranularity,
skipIntervals.get(dataSource)
skipIntervals
);
if (!searchIntervals.isEmpty()) {
timelineIterators.put(
dataSource,
findAndEnqueueSegmentsToCompact(
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
);
} else {
log.warn("Skipping compaction for datasource[%s] as it has no compactible segments.", dataSource);
}
}
});
compactionConfigs.forEach((dataSourceName, config) -> {
if (config == null) {
throw new ISE("Unknown dataSource[%s]", dataSourceName);
}
updateQueue(dataSourceName, config);
});
}
}
@Override
public Map<String, CompactionStatistics> totalCompactedStatistics()
public CompactionStatistics totalCompactedStatistics()
{
return compactedSegmentStats;
}
@Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
public CompactionStatistics totalSkippedStatistics()
{
return skippedSegmentStats;
}
@ -206,25 +198,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<DataSegment> resultSegments = entry.getSegments();
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
final String dataSource = resultSegments.get(0).getDataSource();
updateQueue(dataSource, compactionConfigs.get(dataSource));
return entry;
}
/**
* Find the next segments to compact for the given dataSource and add them to the queue.
* {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from
* the timeline of the given dataSource.
*/
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
{
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
if (!segmentsToCompact.isEmpty()) {
queue.add(segmentsToCompact);
}
}
/**
* Iterates compactible segments in a {@link SegmentTimeline}.
*/
@ -315,27 +291,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
}
/**
* Finds segments to compact together for the given datasource.
*
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
* Finds segments to compact together for the given datasource and adds them to
* the priority queue.
*/
private SegmentsToCompact findSegmentsToCompact(
final String dataSourceName,
final DataSourceCompactionConfig config
)
private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compactibleSegmentIterator)
{
final CompactibleSegmentIterator compactibleSegmentIterator
= timelineIterators.get(dataSourceName);
if (compactibleSegmentIterator == null) {
log.warn(
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
dataSourceName
);
return SegmentsToCompact.empty();
}
final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleSegmentIterator.hasNext()) {
List<DataSegment> segments = compactibleSegmentIterator.next();
@ -352,47 +313,33 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (!compactionStatus.isComplete()) {
log.debug(
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact()
dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
);
}
if (compactionStatus.isComplete()) {
addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
compactedSegmentStats.increment(candidates.getStats());
} else if (candidates.getTotalBytes() > inputSegmentSize) {
addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
skippedSegmentStats.increment(candidates.getStats());
log.warn(
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
+ " is larger than allowed inputSegmentSize[%d].",
dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize
dataSource, interval, candidates.getTotalBytes(), inputSegmentSize
);
} else if (config.getGranularitySpec() != null
&& config.getGranularitySpec().getSegmentGranularity() != null) {
Set<Interval> compactedIntervals = intervalCompactedForDatasource
.computeIfAbsent(dataSourceName, k -> new HashSet<>());
if (compactedIntervals.contains(interval)) {
// Skip these candidate segments as we have already compacted this interval
} else {
compactedIntervals.add(interval);
return candidates;
queue.add(candidates);
}
} else {
return candidates;
queue.add(candidates);
}
}
log.debug("No more segments to compact for datasource[%s].", dataSourceName);
return SegmentsToCompact.empty();
}
private void addSegmentStatsTo(
Map<String, CompactionStatistics> statisticsMap,
String dataSourceName,
SegmentsToCompact segments
)
{
statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create())
.addFrom(segments);
log.debug("No more segments to compact for datasource[%s].", dataSource);
}
/**
@ -428,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<DataSegment> segments = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
);
addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments));
skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats());
}
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
@ -29,7 +30,7 @@ import java.util.List;
import java.util.Map;
/**
* This policy searches segments for compaction from the newest one to oldest one.
* This policy searches segments for compaction from newest to oldest.
*/
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{
@ -42,12 +43,20 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
}
@Override
public CompactionSegmentIterator reset(
public CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
)
{
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
return new PriorityBasedCompactionSegmentIterator(
compactionConfigs,
dataSources,
skipIntervals,
(o1, o2) -> Comparators.intervalsByStartThenEnd()
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
objectMapper
);
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
/**
* Implementation of {@link CompactionSegmentIterator} that returns segments in
* order of their priority.
*/
public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator
{
private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class);
private final PriorityQueue<SegmentsToCompact> queue;
private final Map<String, DataSourceCompactibleSegmentIterator> datasourceIterators;
public PriorityBasedCompactionSegmentIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> datasourceToTimeline,
Map<String, List<Interval>> skipIntervals,
Comparator<SegmentsToCompact> segmentPriority,
ObjectMapper objectMapper
)
{
this.queue = new PriorityQueue<>(segmentPriority);
this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
compactionConfigs.forEach((datasource, config) -> {
if (config == null) {
throw DruidException.defensive("Invalid null compaction config for dataSource[%s].", datasource);
}
final SegmentTimeline timeline = datasourceToTimeline.get(datasource);
if (timeline == null) {
log.warn("Skipping compaction for datasource[%s] as it has no timeline.", datasource);
return;
}
datasourceIterators.put(
datasource,
new DataSourceCompactibleSegmentIterator(
compactionConfigs.get(datasource),
timeline,
skipIntervals.getOrDefault(datasource, Collections.emptyList()),
segmentPriority,
objectMapper
)
);
addNextItemForDatasourceToQueue(datasource);
});
}
@Override
public Map<String, CompactionStatistics> totalCompactedStatistics()
{
return CollectionUtils.mapValues(
datasourceIterators,
DataSourceCompactibleSegmentIterator::totalCompactedStatistics
);
}
@Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
{
return CollectionUtils.mapValues(
datasourceIterators,
DataSourceCompactibleSegmentIterator::totalSkippedStatistics
);
}
@Override
public boolean hasNext()
{
return !queue.isEmpty();
}
@Override
public SegmentsToCompact next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final SegmentsToCompact entry = queue.poll();
if (entry == null) {
throw new NoSuchElementException();
}
Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty");
addNextItemForDatasourceToQueue(entry.getFirst().getDataSource());
return entry;
}
private void addNextItemForDatasourceToQueue(String dataSourceName)
{
final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName);
if (iterator.hasNext()) {
final SegmentsToCompact segmentsToCompact = iterator.next();
if (!segmentsToCompact.isEmpty()) {
queue.add(segmentsToCompact);
}
}
}
}

View File

@ -107,9 +107,9 @@ public class SegmentsToCompact
return umbrellaInterval;
}
public long getNumIntervals()
public CompactionStatistics getStats()
{
return numIntervals;
return CompactionStatistics.create(totalBytes, size(), numIntervals);
}
@Override

View File

@ -54,7 +54,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
@ -87,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
private static final Logger LOG = new Logger(CompactSegments.class);
private static final String TASK_ID_PREFIX = "coordinator-issued";
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
@ -196,7 +196,7 @@ public class CompactSegments implements CoordinatorCustomDuty
// Get iterator over segments to compact and submit compaction tasks
Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
final CompactionSegmentIterator iterator =
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction);
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
final int availableCompactionTaskSlots
@ -215,7 +215,7 @@ public class CompactSegments implements CoordinatorCustomDuty
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
return params;
}
@ -392,28 +392,19 @@ public class CompactSegments implements CoordinatorCustomDuty
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments();
if (segmentsToCompact.isEmpty()) {
if (entry.isEmpty()) {
throw new ISE("segmentsToCompact is empty?");
}
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
final String dataSourceName = entry.getFirst().getDataSource();
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
dataSourceName,
AutoCompactionSnapshot::builder
);
snapshotBuilder
.incrementBytesCompacted(
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
)
.incrementIntervalCountCompacted(
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
)
.incrementSegmentCountCompacted(segmentsToCompact.size());
currentRunAutoCompactionSnapshotBuilders
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
.incrementCompactedStats(entry.getStats());
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
final List<DataSegment> segmentsToCompact = entry.getSegments();
// Create granularitySpec to send to compaction task
ClientCompactionTaskGranularitySpec granularitySpec;
@ -514,7 +505,6 @@ public class CompactSegments implements CoordinatorCustomDuty
}
final String taskId = compactSegments(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
@ -536,7 +526,6 @@ public class CompactSegments implements CoordinatorCustomDuty
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
);
LOG.debugSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
numSubmittedTasks++;
totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
}
@ -554,7 +543,7 @@ public class CompactSegments implements CoordinatorCustomDuty
return newContext;
}
private void addCompactionSnapshotStats(
private void updateCompactionSnapshotStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
CompactionSegmentIterator iterator,
CoordinatorRunStats stats
@ -563,77 +552,45 @@ public class CompactSegments implements CoordinatorCustomDuty
// Mark all the segments remaining in the iterator as "awaiting compaction"
while (iterator.hasNext()) {
final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments();
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
dataSourceName,
AutoCompactionSnapshot::builder
);
snapshotBuilder
.incrementBytesAwaitingCompaction(
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
)
.incrementIntervalCountAwaitingCompaction(
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
)
.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
if (!entry.isEmpty()) {
final String dataSourceName = entry.getFirst().getDataSource();
currentRunAutoCompactionSnapshotBuilders
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
.incrementWaitingStats(entry.getStats());
}
}
// Statistics of all segments considered compacted after this run
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
final String dataSource = compactionStatisticsEntry.getKey();
final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
dataSource,
AutoCompactionSnapshot::builder
);
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
}
iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> {
currentRunAutoCompactionSnapshotBuilders
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
.incrementCompactedStats(compactedStats);
});
// Statistics of all segments considered skipped after this run
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
final String dataSource = compactionStatisticsEntry.getKey();
final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
dataSource,
AutoCompactionSnapshot::builder
);
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes())
.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments())
.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
}
iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> {
currentRunAutoCompactionSnapshotBuilders
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
.incrementSkippedStats(dataSourceSkippedStatistics);
});
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry
: currentRunAutoCompactionSnapshotBuilders.entrySet()) {
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
// Build the complete snapshot for the datasource
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> {
final AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
// Use the complete snapshot to emit metrics
addStatsForDatasource(dataSource, autoCompactionSnapshot, stats);
}
collectSnapshotStats(autoCompactionSnapshot, stats);
});
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
}
private void addStatsForDatasource(
String dataSource,
private void collectSnapshotStats(
AutoCompactionSnapshot autoCompactionSnapshot,
CoordinatorRunStats stats
)
{
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, autoCompactionSnapshot.getDataSource());
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
@ -668,7 +625,6 @@ public class CompactSegments implements CoordinatorCustomDuty
}
private String compactSegments(
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@ -692,7 +648,7 @@ public class CompactSegments implements CoordinatorCustomDuty
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null);
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
taskId,

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import org.junit.Assert;
import org.junit.Test;
@ -30,17 +31,11 @@ public class AutoCompactionSnapshotTest
final String expectedDataSource = "data";
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
// Increment every stats twice
// Increment every stat twice
for (int i = 0; i < 2; i++) {
builder.incrementIntervalCountSkipped(13)
.incrementBytesSkipped(13)
.incrementSegmentCountSkipped(13)
.incrementIntervalCountCompacted(13)
.incrementBytesCompacted(13)
.incrementSegmentCountCompacted(13)
.incrementIntervalCountAwaitingCompaction(13)
.incrementBytesAwaitingCompaction(13)
.incrementSegmentCountAwaitingCompaction(13);
builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
}
final AutoCompactionSnapshot actual = builder.build();

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class CompactionStatusTest
{
private static final String DS_WIKI = "wiki";
@Test
public void testFindPartitionsSpecWhenGivenIsNull()
{
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(null);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithNullMaxTotalRows()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxTotalRows()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, 1000L);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxRowsPerSegment()
{
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(100, 1000L);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config)
)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsHashed()
{
final PartitionsSpec partitionsSpec =
new HashedPartitionsSpec(null, 100, Collections.singletonList("dim"));
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
@Test
public void testFindPartitionsSpecWhenGivenIsRange()
{
final PartitionsSpec partitionsSpec =
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
final ClientCompactionTaskQueryTuningConfig tuningConfig
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
Assert.assertEquals(
partitionsSpec,
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
);
}
private static DataSourceCompactionConfig createCompactionConfig(
PartitionsSpec partitionsSpec
)
{
return new DataSourceCompactionConfig(
DS_WIKI,
null, null, null, null, createTuningConfig(partitionsSpec),
null, null, null, null, null, null, null
);
}
private static UserCompactionTaskQueryTuningConfig createTuningConfig(
PartitionsSpec partitionsSpec
)
{
return new UserCompactionTaskQueryTuningConfig(
null,
null, null, null, null, partitionsSpec, null, null, null,
null, null, null, null, null, null, null, null, null, null
);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class DataSourceCompactibleSegmentIteratorTest
{
@Test
public void testFilterSkipIntervals()
{
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
final List<Interval> expectedSkipIntervals = ImmutableList.of(
Intervals.of("2018-01-15/2018-03-02"),
Intervals.of("2018-07-23/2018-10-01"),
Intervals.of("2018-10-02/2018-12-25"),
Intervals.of("2018-12-31/2019-01-01")
);
final List<Interval> skipIntervals = DataSourceCompactibleSegmentIterator.filterSkipIntervals(
totalInterval,
Lists.newArrayList(
Intervals.of("2017-12-01/2018-01-15"),
Intervals.of("2018-03-02/2018-07-23"),
Intervals.of("2018-10-01/2018-10-02"),
Intervals.of("2018-12-25/2018-12-31")
)
);
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
}
@Test
public void testAddSkipIntervalFromLatestAndSort()
{
final List<Interval> expectedIntervals = ImmutableList.of(
Intervals.of("2018-12-24/2018-12-25"),
Intervals.of("2018-12-29/2019-01-01")
);
final List<Interval> fullSkipIntervals = DataSourceCompactibleSegmentIterator.sortAndAddSkipIntervalFromLatest(
DateTimes.of("2019-01-01"),
new Period(72, 0, 0, 0),
null,
ImmutableList.of(
Intervals.of("2018-12-30/2018-12-31"),
Intervals.of("2018-12-24/2018-12-25")
)
);
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
}
}

View File

@ -1,477 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class NewestSegmentFirstIteratorTest
{
@Test
public void testFilterSkipIntervals()
{
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
final List<Interval> expectedSkipIntervals = ImmutableList.of(
Intervals.of("2018-01-15/2018-03-02"),
Intervals.of("2018-07-23/2018-10-01"),
Intervals.of("2018-10-02/2018-12-25"),
Intervals.of("2018-12-31/2019-01-01")
);
final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
totalInterval,
Lists.newArrayList(
Intervals.of("2017-12-01/2018-01-15"),
Intervals.of("2018-03-02/2018-07-23"),
Intervals.of("2018-10-01/2018-10-02"),
Intervals.of("2018-12-25/2018-12-31")
)
);
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
}
@Test
public void testAddSkipIntervalFromLatestAndSort()
{
final List<Interval> expectedIntervals = ImmutableList.of(
Intervals.of("2018-12-24/2018-12-25"),
Intervals.of("2018-12-29/2019-01-01")
);
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
DateTimes.of("2019-01-01"),
new Period(72, 0, 0, 0),
null,
ImmutableList.of(
Intervals.of("2018-12-30/2018-12-31"),
Intervals.of("2018-12-24/2018-12-25")
)
);
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
}
@Test
public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPartitinosSpecWithMaxTotalRowsOfLongMax()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, 1000L),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGivenValue()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(100, 1000L),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPartitionsSpecIgnoreDeprecatedOne()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
100,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartitionsSpecIgnoreDeprecatedOne()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
1000L,
null,
new DynamicPartitionsSpec(null, null),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@Test
public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec()
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
"datasource",
null,
null,
null,
null,
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
}

View File

@ -88,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
public void testLargeOffsetAndSmallSegmentInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -113,7 +113,7 @@ public class NewestSegmentFirstPolicyTest
public void testSmallOffsetAndLargeSegmentInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -146,7 +146,7 @@ public class NewestSegmentFirstPolicyTest
public void testLargeGapInData()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -179,7 +179,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testHugeShard()
{
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -229,7 +229,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testManySegmentsPerShard()
{
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -287,7 +287,7 @@ public class NewestSegmentFirstPolicyTest
{
final String unknownDataSource = "unknown";
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(
unknownDataSource,
createCompactionConfig(10000, new Period("P2D"), null),
@ -337,7 +337,7 @@ public class NewestSegmentFirstPolicyTest
2
)
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -374,7 +374,7 @@ public class NewestSegmentFirstPolicyTest
)
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
)
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -412,7 +412,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -445,7 +445,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -471,7 +471,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -496,7 +496,7 @@ public class NewestSegmentFirstPolicyTest
public void testWithSkipIntervals()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -536,7 +536,7 @@ public class NewestSegmentFirstPolicyTest
public void testHoleInSearchInterval()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
ImmutableMap.of(
DATA_SOURCE,
@ -586,7 +586,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -635,7 +635,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -696,7 +696,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -740,7 +740,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -773,7 +773,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -787,7 +787,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -806,7 +806,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -830,7 +830,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -849,7 +849,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -873,7 +873,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -887,7 +887,7 @@ public class NewestSegmentFirstPolicyTest
// Duration of new segmentGranularity is the same as before (P1D),
// but we changed the timezone from UTC to Bangkok in the auto compaction spec
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(
130000,
@ -925,7 +925,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -938,7 +938,7 @@ public class NewestSegmentFirstPolicyTest
);
// Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(
130000,
@ -976,7 +976,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1004,7 +1004,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets rollup=true
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -1036,7 +1036,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1064,7 +1064,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets queryGranularity=MINUTE
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -1096,7 +1096,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1131,7 +1131,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets Dimensions=["foo"]
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1172,7 +1172,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets Dimensions=null
iterator = policy.reset(
iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1195,7 +1195,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1251,7 +1251,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null)
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1292,7 +1292,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets filter=null
iterator = policy.reset(
iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1319,7 +1319,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1375,7 +1375,7 @@ public class NewestSegmentFirstPolicyTest
);
// Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")}
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1416,7 +1416,7 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
// Auto compaction config sets metricsSpec=null
iterator = policy.reset(
iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1440,7 +1440,7 @@ public class NewestSegmentFirstPolicyTest
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
);
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
@ -1468,7 +1468,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -1481,7 +1481,7 @@ public class NewestSegmentFirstPolicyTest
);
// Duration of new segmentGranularity is the same as before (P1D)
final CompactionSegmentIterator iterator = policy.reset(
final CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(
130000,
@ -1517,7 +1517,7 @@ public class NewestSegmentFirstPolicyTest
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
{
NullHandling.initializeForTests();
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
final SegmentTimeline timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@ -1534,7 +1534,7 @@ public class NewestSegmentFirstPolicyTest
)
);
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1569,7 +1569,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertFalse(iterator.hasNext());
iterator = policy.reset(
iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
130000,
new Period("P0D"),
@ -1608,7 +1608,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testSkipAllGranularityToDefault()
{
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1640,7 +1640,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testSkipFirstHalfEternityToDefault()
{
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1672,7 +1672,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testSkipSecondHalfOfEternityToDefault()
{
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1704,7 +1704,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testSkipAllToAllGranularity()
{
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1736,7 +1736,7 @@ public class NewestSegmentFirstPolicyTest
@Test
public void testSkipAllToFinerGranularity()
{
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1799,7 +1799,7 @@ public class NewestSegmentFirstPolicyTest
0,
1);
CompactionSegmentIterator iterator = policy.reset(
CompactionSegmentIterator iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),
@ -1850,7 +1850,7 @@ public class NewestSegmentFirstPolicyTest
TombstoneShardSpec.INSTANCE,
0,
1);
iterator = policy.reset(
iterator = policy.createIterator(
ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(10000,
new Period("P0D"),