mirror of https://github.com/apache/druid.git
Allow CompactionSegmentIterator to have custom priority (#16737)
Changes: - Break `NewestSegmentFirstIterator` into two parts - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator` but now handles a single datasource and allows a priority to be specified - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and combines the results into a single queue to be used by a compaction search policy - Update `NewestSegmentFirstPolicy` to use the above new classes - Cleanup `CompactionStatistics` and `AutoCompactionSnapshot` - Cleanup `CompactSegments` - Remove unused methods from `Tasks` - Remove unneeded `TasksTest` - Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest` and `DatasourceCompactibleSegmentIteratorTest`
This commit is contained in:
parent
6cf6838eb9
commit
01d67ae543
|
@ -141,7 +141,7 @@ public class NewestSegmentFirstPolicyBenchmark
|
|||
@Benchmark
|
||||
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
|
||||
{
|
||||
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
|
||||
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
|
||||
blackhole.consume(iterator.next());
|
||||
}
|
||||
|
|
|
@ -21,16 +21,9 @@ package org.apache.druid.indexing.common.task;
|
|||
|
||||
import org.apache.curator.shaded.com.google.common.base.Verify;
|
||||
import org.apache.druid.indexing.common.TaskLockType;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.server.coordinator.duty.CompactSegments;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Tasks
|
||||
|
@ -63,44 +56,19 @@ public class Tasks
|
|||
* Context flag denoting if maximum possible values should be used to estimate
|
||||
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
|
||||
* more details.
|
||||
*
|
||||
* <p>
|
||||
* The value of this flag is true by default which corresponds to the old method
|
||||
* of estimation.
|
||||
*/
|
||||
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";
|
||||
|
||||
/**
|
||||
* This context is used in compaction. When it is set in the context, the segments created by the task
|
||||
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
|
||||
* See {@link org.apache.druid.timeline.DataSegment} and {@link
|
||||
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
|
||||
* Context flag to denote if segments published to metadata by a task should
|
||||
* have the {@code lastCompactionState} field set.
|
||||
*/
|
||||
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
|
||||
|
||||
static {
|
||||
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
|
||||
}
|
||||
|
||||
public static SortedSet<Interval> computeCondensedIntervals(SortedSet<Interval> intervals)
|
||||
{
|
||||
final SortedSet<Interval> condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
||||
List<Interval> toBeAccumulated = new ArrayList<>();
|
||||
for (Interval interval : intervals) {
|
||||
if (toBeAccumulated.size() == 0) {
|
||||
toBeAccumulated.add(interval);
|
||||
} else {
|
||||
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
|
||||
toBeAccumulated.add(interval);
|
||||
} else {
|
||||
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
|
||||
toBeAccumulated.clear();
|
||||
toBeAccumulated.add(interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (toBeAccumulated.size() > 0) {
|
||||
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
|
||||
}
|
||||
return condensedIntervals;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.task;
|
||||
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class TasksTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testComputeCondensedIntervals()
|
||||
{
|
||||
final SortedSet<Interval> inputIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
||||
for (int m = 1; m < 13; m++) {
|
||||
for (int d = 1; d < 10; d++) {
|
||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
||||
}
|
||||
|
||||
for (int d = 12; d < 20; d++) {
|
||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
||||
}
|
||||
|
||||
inputIntervals.add(getInterval(m, 22, m, 23));
|
||||
|
||||
for (int d = 25; d < 28; d++) {
|
||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
||||
}
|
||||
|
||||
if (m == 1 || m == 3 || m == 5 || m == 7 || m == 8 || m == 10) {
|
||||
inputIntervals.add(getInterval(m, 31, m + 1, 1));
|
||||
}
|
||||
}
|
||||
|
||||
inputIntervals.add(Intervals.of("2017-12-31/2018-01-01"));
|
||||
|
||||
final SortedSet<Interval> condensedIntervals = Tasks.computeCondensedIntervals(inputIntervals);
|
||||
final Iterator<Interval> condensedIntervalIterator = condensedIntervals.iterator();
|
||||
Assert.assertTrue(condensedIntervalIterator.hasNext());
|
||||
|
||||
Interval condensedInterval = condensedIntervalIterator.next();
|
||||
final SortedSet<Interval> checkedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
||||
for (Interval inputInterval : inputIntervals) {
|
||||
if (!condensedInterval.contains(inputInterval)) {
|
||||
if (condensedIntervalIterator.hasNext()) {
|
||||
condensedInterval = condensedIntervalIterator.next();
|
||||
Assert.assertTrue(condensedInterval.contains(inputInterval));
|
||||
}
|
||||
}
|
||||
checkedIntervals.add(inputInterval);
|
||||
}
|
||||
|
||||
Assert.assertFalse(condensedIntervalIterator.hasNext());
|
||||
Assert.assertEquals(inputIntervals, checkedIntervals);
|
||||
}
|
||||
|
||||
private static Interval getInterval(int startMonth, int startDay, int endMonth, int endDay)
|
||||
{
|
||||
return Intervals.of(
|
||||
StringUtils.format(
|
||||
"2017-%02d-%02d/2017-%02d-%02d",
|
||||
startMonth,
|
||||
startDay,
|
||||
endMonth,
|
||||
endDay
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
|
|||
import org.apache.druid.segment.incremental.AppendableIndexSpec;
|
||||
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
|
@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig
|
|||
@Nullable
|
||||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
|
||||
public static ClientCompactionTaskQueryTuningConfig from(
|
||||
DataSourceCompactionConfig compactionConfig
|
||||
)
|
||||
{
|
||||
if (compactionConfig == null) {
|
||||
return from(null, null, null);
|
||||
} else {
|
||||
return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null);
|
||||
}
|
||||
}
|
||||
|
||||
public static ClientCompactionTaskQueryTuningConfig from(
|
||||
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Objects;
|
||||
|
@ -193,15 +194,9 @@ public class AutoCompactionSnapshot
|
|||
private final String dataSource;
|
||||
private final AutoCompactionScheduleStatus scheduleStatus;
|
||||
|
||||
private long bytesAwaitingCompaction;
|
||||
private long bytesCompacted;
|
||||
private long bytesSkipped;
|
||||
private long segmentCountAwaitingCompaction;
|
||||
private long segmentCountCompacted;
|
||||
private long segmentCountSkipped;
|
||||
private long intervalCountAwaitingCompaction;
|
||||
private long intervalCountCompacted;
|
||||
private long intervalCountSkipped;
|
||||
private final CompactionStatistics compactedStats = new CompactionStatistics();
|
||||
private final CompactionStatistics skippedStats = new CompactionStatistics();
|
||||
private final CompactionStatistics waitingStats = new CompactionStatistics();
|
||||
|
||||
private Builder(
|
||||
@NotNull String dataSource,
|
||||
|
@ -217,69 +212,21 @@ public class AutoCompactionSnapshot
|
|||
|
||||
this.dataSource = dataSource;
|
||||
this.scheduleStatus = scheduleStatus;
|
||||
this.bytesAwaitingCompaction = 0;
|
||||
this.bytesCompacted = 0;
|
||||
this.bytesSkipped = 0;
|
||||
this.segmentCountAwaitingCompaction = 0;
|
||||
this.segmentCountCompacted = 0;
|
||||
this.segmentCountSkipped = 0;
|
||||
this.intervalCountAwaitingCompaction = 0;
|
||||
this.intervalCountCompacted = 0;
|
||||
this.intervalCountSkipped = 0;
|
||||
}
|
||||
|
||||
public Builder incrementBytesAwaitingCompaction(long incrementValue)
|
||||
public void incrementWaitingStats(CompactionStatistics entry)
|
||||
{
|
||||
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
|
||||
return this;
|
||||
waitingStats.increment(entry);
|
||||
}
|
||||
|
||||
public Builder incrementBytesCompacted(long incrementValue)
|
||||
public void incrementCompactedStats(CompactionStatistics entry)
|
||||
{
|
||||
this.bytesCompacted = this.bytesCompacted + incrementValue;
|
||||
return this;
|
||||
compactedStats.increment(entry);
|
||||
}
|
||||
|
||||
public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
|
||||
public void incrementSkippedStats(CompactionStatistics entry)
|
||||
{
|
||||
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementSegmentCountCompacted(long incrementValue)
|
||||
{
|
||||
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
|
||||
{
|
||||
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementIntervalCountCompacted(long incrementValue)
|
||||
{
|
||||
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementBytesSkipped(long incrementValue)
|
||||
{
|
||||
this.bytesSkipped = this.bytesSkipped + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementSegmentCountSkipped(long incrementValue)
|
||||
{
|
||||
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder incrementIntervalCountSkipped(long incrementValue)
|
||||
{
|
||||
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
|
||||
return this;
|
||||
skippedStats.increment(entry);
|
||||
}
|
||||
|
||||
public AutoCompactionSnapshot build()
|
||||
|
@ -287,15 +234,15 @@ public class AutoCompactionSnapshot
|
|||
return new AutoCompactionSnapshot(
|
||||
dataSource,
|
||||
scheduleStatus,
|
||||
bytesAwaitingCompaction,
|
||||
bytesCompacted,
|
||||
bytesSkipped,
|
||||
segmentCountAwaitingCompaction,
|
||||
segmentCountCompacted,
|
||||
segmentCountSkipped,
|
||||
intervalCountAwaitingCompaction,
|
||||
intervalCountCompacted,
|
||||
intervalCountSkipped
|
||||
waitingStats.getTotalBytes(),
|
||||
compactedStats.getTotalBytes(),
|
||||
skippedStats.getTotalBytes(),
|
||||
waitingStats.getNumSegments(),
|
||||
compactedStats.getNumSegments(),
|
||||
skippedStats.getNumSegments(),
|
||||
waitingStats.getNumIntervals(),
|
||||
compactedStats.getNumIntervals(),
|
||||
skippedStats.getNumIntervals()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,9 +33,9 @@ import java.util.Map;
|
|||
public interface CompactionSegmentSearchPolicy
|
||||
{
|
||||
/**
|
||||
* Reset the current states of this policy. This method should be called whenever iterating starts.
|
||||
* Creates an iterator that returns compactible segments.
|
||||
*/
|
||||
CompactionSegmentIterator reset(
|
||||
CompactionSegmentIterator createIterator(
|
||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||
Map<String, SegmentTimeline> dataSources,
|
||||
Map<String, List<Interval>> skipIntervals
|
||||
|
|
|
@ -28,9 +28,13 @@ public class CompactionStatistics
|
|||
private long numSegments;
|
||||
private long numIntervals;
|
||||
|
||||
public static CompactionStatistics create()
|
||||
public static CompactionStatistics create(long bytes, long numSegments, long numIntervals)
|
||||
{
|
||||
return new CompactionStatistics();
|
||||
final CompactionStatistics stats = new CompactionStatistics();
|
||||
stats.totalBytes = bytes;
|
||||
stats.numIntervals = numIntervals;
|
||||
stats.numSegments = numSegments;
|
||||
return stats;
|
||||
}
|
||||
|
||||
public long getTotalBytes()
|
||||
|
@ -48,10 +52,10 @@ public class CompactionStatistics
|
|||
return numIntervals;
|
||||
}
|
||||
|
||||
public void addFrom(SegmentsToCompact segments)
|
||||
public void increment(CompactionStatistics other)
|
||||
{
|
||||
totalBytes += segments.getTotalBytes();
|
||||
numIntervals += segments.getNumIntervals();
|
||||
numSegments += segments.size();
|
||||
totalBytes += other.getTotalBytes();
|
||||
numIntervals += other.getNumIntervals();
|
||||
numSegments += other.getNumSegments();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,12 +167,7 @@ public class CompactionStatus
|
|||
this.objectMapper = objectMapper;
|
||||
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
|
||||
this.compactionConfig = compactionConfig;
|
||||
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
|
||||
compactionConfig.getTuningConfig(),
|
||||
compactionConfig.getMaxRowsPerSegment(),
|
||||
null
|
||||
);
|
||||
|
||||
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
|
||||
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
|
||||
if (lastCompactionState == null) {
|
||||
this.existingGranularitySpec = null;
|
||||
|
|
|
@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.JodaUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
|
@ -59,44 +57,46 @@ import java.util.Set;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class iterates all segments of the dataSources configured for compaction from the newest to the oldest.
|
||||
* Iterator over compactible segments of a datasource in order of specified priority.
|
||||
*/
|
||||
public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||
public class DataSourceCompactibleSegmentIterator implements Iterator<SegmentsToCompact>
|
||||
{
|
||||
private static final Logger log = new Logger(NewestSegmentFirstIterator.class);
|
||||
private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class);
|
||||
|
||||
private final String dataSource;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
|
||||
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>();
|
||||
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>();
|
||||
|
||||
private final Map<String, CompactibleSegmentIterator> timelineIterators;
|
||||
private final DataSourceCompactionConfig config;
|
||||
private final CompactionStatistics compactedSegmentStats = new CompactionStatistics();
|
||||
private final CompactionStatistics skippedSegmentStats = new CompactionStatistics();
|
||||
|
||||
// This is needed for datasource that has segmentGranularity configured
|
||||
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
|
||||
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
|
||||
// run of the compaction job and skip any interval that was already previously compacted.
|
||||
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>();
|
||||
private final Set<Interval> compactedIntervals = new HashSet<>();
|
||||
|
||||
private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
|
||||
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval())
|
||||
);
|
||||
private final PriorityQueue<SegmentsToCompact> queue;
|
||||
|
||||
NewestSegmentFirstIterator(
|
||||
ObjectMapper objectMapper,
|
||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||
Map<String, SegmentTimeline> dataSources,
|
||||
Map<String, List<Interval>> skipIntervals
|
||||
public DataSourceCompactibleSegmentIterator(
|
||||
DataSourceCompactionConfig config,
|
||||
SegmentTimeline timeline,
|
||||
List<Interval> skipIntervals,
|
||||
Comparator<SegmentsToCompact> segmentPriority,
|
||||
ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
this.objectMapper = objectMapper;
|
||||
this.compactionConfigs = compactionConfigs;
|
||||
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
|
||||
this.config = config;
|
||||
this.dataSource = config.getDataSource();
|
||||
this.queue = new PriorityQueue<>(segmentPriority);
|
||||
populateQueue(timeline, skipIntervals);
|
||||
}
|
||||
|
||||
dataSources.forEach((dataSource, timeline) -> {
|
||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
|
||||
private void populateQueue(SegmentTimeline timeline, List<Interval> skipIntervals)
|
||||
{
|
||||
if (timeline != null) {
|
||||
Granularity configuredSegmentGranularity = null;
|
||||
if (config != null && !timeline.isEmpty()) {
|
||||
if (!timeline.isEmpty()) {
|
||||
SegmentTimeline originalTimeline = null;
|
||||
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||
String temporaryVersion = DateTimes.nowUtc().toString();
|
||||
|
@ -154,33 +154,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
timeline,
|
||||
config.getSkipOffsetFromLatest(),
|
||||
configuredSegmentGranularity,
|
||||
skipIntervals.get(dataSource)
|
||||
skipIntervals
|
||||
);
|
||||
if (!searchIntervals.isEmpty()) {
|
||||
timelineIterators.put(
|
||||
dataSource,
|
||||
findAndEnqueueSegmentsToCompact(
|
||||
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
|
||||
);
|
||||
} else {
|
||||
log.warn("Skipping compaction for datasource[%s] as it has no compactible segments.", dataSource);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
compactionConfigs.forEach((dataSourceName, config) -> {
|
||||
if (config == null) {
|
||||
throw new ISE("Unknown dataSource[%s]", dataSourceName);
|
||||
}
|
||||
updateQueue(dataSourceName, config);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, CompactionStatistics> totalCompactedStatistics()
|
||||
public CompactionStatistics totalCompactedStatistics()
|
||||
{
|
||||
return compactedSegmentStats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, CompactionStatistics> totalSkippedStatistics()
|
||||
public CompactionStatistics totalSkippedStatistics()
|
||||
{
|
||||
return skippedSegmentStats;
|
||||
}
|
||||
|
@ -206,25 +198,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
final List<DataSegment> resultSegments = entry.getSegments();
|
||||
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
|
||||
|
||||
final String dataSource = resultSegments.get(0).getDataSource();
|
||||
updateQueue(dataSource, compactionConfigs.get(dataSource));
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the next segments to compact for the given dataSource and add them to the queue.
|
||||
* {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from
|
||||
* the timeline of the given dataSource.
|
||||
*/
|
||||
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
|
||||
{
|
||||
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
|
||||
if (!segmentsToCompact.isEmpty()) {
|
||||
queue.add(segmentsToCompact);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterates compactible segments in a {@link SegmentTimeline}.
|
||||
*/
|
||||
|
@ -315,27 +291,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
}
|
||||
|
||||
/**
|
||||
* Finds segments to compact together for the given datasource.
|
||||
*
|
||||
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
|
||||
* Finds segments to compact together for the given datasource and adds them to
|
||||
* the priority queue.
|
||||
*/
|
||||
private SegmentsToCompact findSegmentsToCompact(
|
||||
final String dataSourceName,
|
||||
final DataSourceCompactionConfig config
|
||||
)
|
||||
private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compactibleSegmentIterator)
|
||||
{
|
||||
final CompactibleSegmentIterator compactibleSegmentIterator
|
||||
= timelineIterators.get(dataSourceName);
|
||||
if (compactibleSegmentIterator == null) {
|
||||
log.warn(
|
||||
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
|
||||
dataSourceName
|
||||
);
|
||||
return SegmentsToCompact.empty();
|
||||
}
|
||||
|
||||
final long inputSegmentSize = config.getInputSegmentSizeBytes();
|
||||
|
||||
while (compactibleSegmentIterator.hasNext()) {
|
||||
List<DataSegment> segments = compactibleSegmentIterator.next();
|
||||
|
||||
|
@ -352,47 +313,33 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
if (!compactionStatus.isComplete()) {
|
||||
log.debug(
|
||||
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
|
||||
dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact()
|
||||
dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
|
||||
);
|
||||
}
|
||||
|
||||
if (compactionStatus.isComplete()) {
|
||||
addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
|
||||
compactedSegmentStats.increment(candidates.getStats());
|
||||
} else if (candidates.getTotalBytes() > inputSegmentSize) {
|
||||
addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
|
||||
skippedSegmentStats.increment(candidates.getStats());
|
||||
log.warn(
|
||||
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
|
||||
+ " is larger than allowed inputSegmentSize[%d].",
|
||||
dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize
|
||||
dataSource, interval, candidates.getTotalBytes(), inputSegmentSize
|
||||
);
|
||||
} else if (config.getGranularitySpec() != null
|
||||
&& config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||
Set<Interval> compactedIntervals = intervalCompactedForDatasource
|
||||
.computeIfAbsent(dataSourceName, k -> new HashSet<>());
|
||||
|
||||
if (compactedIntervals.contains(interval)) {
|
||||
// Skip these candidate segments as we have already compacted this interval
|
||||
} else {
|
||||
compactedIntervals.add(interval);
|
||||
return candidates;
|
||||
queue.add(candidates);
|
||||
}
|
||||
} else {
|
||||
return candidates;
|
||||
queue.add(candidates);
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("No more segments to compact for datasource[%s].", dataSourceName);
|
||||
return SegmentsToCompact.empty();
|
||||
}
|
||||
|
||||
private void addSegmentStatsTo(
|
||||
Map<String, CompactionStatistics> statisticsMap,
|
||||
String dataSourceName,
|
||||
SegmentsToCompact segments
|
||||
)
|
||||
{
|
||||
statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create())
|
||||
.addFrom(segments);
|
||||
log.debug("No more segments to compact for datasource[%s].", dataSource);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -428,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
|||
final List<DataSegment> segments = new ArrayList<>(
|
||||
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
|
||||
);
|
||||
addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments));
|
||||
skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats());
|
||||
}
|
||||
|
||||
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
|
|
@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.compact;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.timeline.SegmentTimeline;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -29,7 +30,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This policy searches segments for compaction from the newest one to oldest one.
|
||||
* This policy searches segments for compaction from newest to oldest.
|
||||
*/
|
||||
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
||||
{
|
||||
|
@ -42,12 +43,20 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompactionSegmentIterator reset(
|
||||
public CompactionSegmentIterator createIterator(
|
||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||
Map<String, SegmentTimeline> dataSources,
|
||||
Map<String, List<Interval>> skipIntervals
|
||||
)
|
||||
{
|
||||
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
|
||||
return new PriorityBasedCompactionSegmentIterator(
|
||||
compactionConfigs,
|
||||
dataSources,
|
||||
skipIntervals,
|
||||
(o1, o2) -> Comparators.intervalsByStartThenEnd()
|
||||
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
|
||||
objectMapper
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordinator.compact;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.timeline.SegmentTimeline;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.PriorityQueue;
|
||||
|
||||
/**
|
||||
* Implementation of {@link CompactionSegmentIterator} that returns segments in
|
||||
* order of their priority.
|
||||
*/
|
||||
public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator
|
||||
{
|
||||
private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class);
|
||||
|
||||
private final PriorityQueue<SegmentsToCompact> queue;
|
||||
private final Map<String, DataSourceCompactibleSegmentIterator> datasourceIterators;
|
||||
|
||||
public PriorityBasedCompactionSegmentIterator(
|
||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||
Map<String, SegmentTimeline> datasourceToTimeline,
|
||||
Map<String, List<Interval>> skipIntervals,
|
||||
Comparator<SegmentsToCompact> segmentPriority,
|
||||
ObjectMapper objectMapper
|
||||
)
|
||||
{
|
||||
this.queue = new PriorityQueue<>(segmentPriority);
|
||||
this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
|
||||
compactionConfigs.forEach((datasource, config) -> {
|
||||
if (config == null) {
|
||||
throw DruidException.defensive("Invalid null compaction config for dataSource[%s].", datasource);
|
||||
}
|
||||
final SegmentTimeline timeline = datasourceToTimeline.get(datasource);
|
||||
if (timeline == null) {
|
||||
log.warn("Skipping compaction for datasource[%s] as it has no timeline.", datasource);
|
||||
return;
|
||||
}
|
||||
|
||||
datasourceIterators.put(
|
||||
datasource,
|
||||
new DataSourceCompactibleSegmentIterator(
|
||||
compactionConfigs.get(datasource),
|
||||
timeline,
|
||||
skipIntervals.getOrDefault(datasource, Collections.emptyList()),
|
||||
segmentPriority,
|
||||
objectMapper
|
||||
)
|
||||
);
|
||||
addNextItemForDatasourceToQueue(datasource);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, CompactionStatistics> totalCompactedStatistics()
|
||||
{
|
||||
return CollectionUtils.mapValues(
|
||||
datasourceIterators,
|
||||
DataSourceCompactibleSegmentIterator::totalCompactedStatistics
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, CompactionStatistics> totalSkippedStatistics()
|
||||
{
|
||||
return CollectionUtils.mapValues(
|
||||
datasourceIterators,
|
||||
DataSourceCompactibleSegmentIterator::totalSkippedStatistics
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return !queue.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentsToCompact next()
|
||||
{
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
final SegmentsToCompact entry = queue.poll();
|
||||
if (entry == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty");
|
||||
|
||||
addNextItemForDatasourceToQueue(entry.getFirst().getDataSource());
|
||||
return entry;
|
||||
}
|
||||
|
||||
private void addNextItemForDatasourceToQueue(String dataSourceName)
|
||||
{
|
||||
final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName);
|
||||
if (iterator.hasNext()) {
|
||||
final SegmentsToCompact segmentsToCompact = iterator.next();
|
||||
if (!segmentsToCompact.isEmpty()) {
|
||||
queue.add(segmentsToCompact);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -107,9 +107,9 @@ public class SegmentsToCompact
|
|||
return umbrellaInterval;
|
||||
}
|
||||
|
||||
public long getNumIntervals()
|
||||
public CompactionStatistics getStats()
|
||||
{
|
||||
return numIntervals;
|
||||
return CompactionStatistics.create(totalBytes, size(), numIntervals);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
|||
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
|
||||
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
|
||||
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
||||
import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
|
||||
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
|
||||
import org.apache.druid.server.coordinator.stats.Dimension;
|
||||
|
@ -87,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
|
||||
private static final Logger LOG = new Logger(CompactSegments.class);
|
||||
|
||||
private static final String TASK_ID_PREFIX = "coordinator-issued";
|
||||
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
|
||||
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
|
||||
|
||||
|
@ -196,7 +196,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
// Get iterator over segments to compact and submit compaction tasks
|
||||
Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
|
||||
final CompactionSegmentIterator iterator =
|
||||
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
|
||||
policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction);
|
||||
|
||||
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
|
||||
final int availableCompactionTaskSlots
|
||||
|
@ -215,7 +215,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
|
||||
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
|
||||
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
|
||||
addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
|
||||
updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
|
||||
|
||||
return params;
|
||||
}
|
||||
|
@ -392,28 +392,19 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
|
||||
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
|
||||
final SegmentsToCompact entry = iterator.next();
|
||||
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
||||
if (segmentsToCompact.isEmpty()) {
|
||||
if (entry.isEmpty()) {
|
||||
throw new ISE("segmentsToCompact is empty?");
|
||||
}
|
||||
|
||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
||||
final String dataSourceName = entry.getFirst().getDataSource();
|
||||
|
||||
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSourceName,
|
||||
AutoCompactionSnapshot::builder
|
||||
);
|
||||
snapshotBuilder
|
||||
.incrementBytesCompacted(
|
||||
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
|
||||
)
|
||||
.incrementIntervalCountCompacted(
|
||||
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
|
||||
)
|
||||
.incrementSegmentCountCompacted(segmentsToCompact.size());
|
||||
currentRunAutoCompactionSnapshotBuilders
|
||||
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
|
||||
.incrementCompactedStats(entry.getStats());
|
||||
|
||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
|
||||
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
||||
|
||||
// Create granularitySpec to send to compaction task
|
||||
ClientCompactionTaskGranularitySpec granularitySpec;
|
||||
|
@ -514,7 +505,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
}
|
||||
|
||||
final String taskId = compactSegments(
|
||||
"coordinator-issued",
|
||||
segmentsToCompact,
|
||||
config.getTaskPriority(),
|
||||
ClientCompactionTaskQueryTuningConfig.from(
|
||||
|
@ -536,7 +526,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
|
||||
);
|
||||
LOG.debugSegments(segmentsToCompact, "Compacting segments");
|
||||
// Count the compaction task itself + its sub tasks
|
||||
numSubmittedTasks++;
|
||||
totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
|
||||
}
|
||||
|
@ -554,7 +543,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
return newContext;
|
||||
}
|
||||
|
||||
private void addCompactionSnapshotStats(
|
||||
private void updateCompactionSnapshotStats(
|
||||
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
|
||||
CompactionSegmentIterator iterator,
|
||||
CoordinatorRunStats stats
|
||||
|
@ -563,77 +552,45 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
// Mark all the segments remaining in the iterator as "awaiting compaction"
|
||||
while (iterator.hasNext()) {
|
||||
final SegmentsToCompact entry = iterator.next();
|
||||
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
||||
if (!segmentsToCompact.isEmpty()) {
|
||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSourceName,
|
||||
AutoCompactionSnapshot::builder
|
||||
);
|
||||
snapshotBuilder
|
||||
.incrementBytesAwaitingCompaction(
|
||||
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
|
||||
)
|
||||
.incrementIntervalCountAwaitingCompaction(
|
||||
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
|
||||
)
|
||||
.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
|
||||
if (!entry.isEmpty()) {
|
||||
final String dataSourceName = entry.getFirst().getDataSource();
|
||||
currentRunAutoCompactionSnapshotBuilders
|
||||
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
|
||||
.incrementWaitingStats(entry.getStats());
|
||||
}
|
||||
}
|
||||
|
||||
// Statistics of all segments considered compacted after this run
|
||||
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
|
||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
|
||||
final String dataSource = compactionStatisticsEntry.getKey();
|
||||
final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
|
||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSource,
|
||||
AutoCompactionSnapshot::builder
|
||||
);
|
||||
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
|
||||
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
|
||||
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
|
||||
}
|
||||
iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> {
|
||||
currentRunAutoCompactionSnapshotBuilders
|
||||
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
|
||||
.incrementCompactedStats(compactedStats);
|
||||
});
|
||||
|
||||
// Statistics of all segments considered skipped after this run
|
||||
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
|
||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
|
||||
final String dataSource = compactionStatisticsEntry.getKey();
|
||||
final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
|
||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
||||
dataSource,
|
||||
AutoCompactionSnapshot::builder
|
||||
);
|
||||
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes())
|
||||
.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments())
|
||||
.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
|
||||
}
|
||||
iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> {
|
||||
currentRunAutoCompactionSnapshotBuilders
|
||||
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
|
||||
.incrementSkippedStats(dataSourceSkippedStatistics);
|
||||
});
|
||||
|
||||
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
|
||||
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry
|
||||
: currentRunAutoCompactionSnapshotBuilders.entrySet()) {
|
||||
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
|
||||
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
|
||||
|
||||
// Build the complete snapshot for the datasource
|
||||
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
|
||||
currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> {
|
||||
final AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
|
||||
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
|
||||
|
||||
// Use the complete snapshot to emit metrics
|
||||
addStatsForDatasource(dataSource, autoCompactionSnapshot, stats);
|
||||
}
|
||||
collectSnapshotStats(autoCompactionSnapshot, stats);
|
||||
});
|
||||
|
||||
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
|
||||
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
|
||||
}
|
||||
|
||||
private void addStatsForDatasource(
|
||||
String dataSource,
|
||||
private void collectSnapshotStats(
|
||||
AutoCompactionSnapshot autoCompactionSnapshot,
|
||||
CoordinatorRunStats stats
|
||||
)
|
||||
{
|
||||
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
|
||||
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, autoCompactionSnapshot.getDataSource());
|
||||
|
||||
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
|
||||
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
|
||||
|
@ -668,7 +625,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
}
|
||||
|
||||
private String compactSegments(
|
||||
String idPrefix,
|
||||
List<DataSegment> segments,
|
||||
int compactionTaskPriority,
|
||||
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||
|
@ -692,7 +648,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
|||
context = context == null ? new HashMap<>() : context;
|
||||
context.put("priority", compactionTaskPriority);
|
||||
|
||||
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
|
||||
final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null);
|
||||
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
|
||||
final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
|
||||
taskId,
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.server.coordinator;
|
||||
|
||||
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -30,17 +31,11 @@ public class AutoCompactionSnapshotTest
|
|||
final String expectedDataSource = "data";
|
||||
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
|
||||
|
||||
// Increment every stats twice
|
||||
// Increment every stat twice
|
||||
for (int i = 0; i < 2; i++) {
|
||||
builder.incrementIntervalCountSkipped(13)
|
||||
.incrementBytesSkipped(13)
|
||||
.incrementSegmentCountSkipped(13)
|
||||
.incrementIntervalCountCompacted(13)
|
||||
.incrementBytesCompacted(13)
|
||||
.incrementSegmentCountCompacted(13)
|
||||
.incrementIntervalCountAwaitingCompaction(13)
|
||||
.incrementBytesAwaitingCompaction(13)
|
||||
.incrementSegmentCountAwaitingCompaction(13);
|
||||
builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
|
||||
builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
|
||||
builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
|
||||
}
|
||||
|
||||
final AutoCompactionSnapshot actual = builder.build();
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordinator.compact;
|
||||
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public class CompactionStatusTest
|
||||
{
|
||||
private static final String DS_WIKI = "wiki";
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsNull()
|
||||
{
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(null);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsDynamicWithNullMaxTotalRows()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxTotalRows()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, 1000L);
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
partitionsSpec,
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxRowsPerSegment()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(100, 1000L);
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
partitionsSpec,
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
100,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000L,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(100, 1000L),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsHashed()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec =
|
||||
new HashedPartitionsSpec(null, 100, Collections.singletonList("dim"));
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
partitionsSpec,
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecWhenGivenIsRange()
|
||||
{
|
||||
final PartitionsSpec partitionsSpec =
|
||||
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
|
||||
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||
Assert.assertEquals(
|
||||
partitionsSpec,
|
||||
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||
);
|
||||
}
|
||||
|
||||
private static DataSourceCompactionConfig createCompactionConfig(
|
||||
PartitionsSpec partitionsSpec
|
||||
)
|
||||
{
|
||||
return new DataSourceCompactionConfig(
|
||||
DS_WIKI,
|
||||
null, null, null, null, createTuningConfig(partitionsSpec),
|
||||
null, null, null, null, null, null, null
|
||||
);
|
||||
}
|
||||
|
||||
private static UserCompactionTaskQueryTuningConfig createTuningConfig(
|
||||
PartitionsSpec partitionsSpec
|
||||
)
|
||||
{
|
||||
return new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null, null, null, null, partitionsSpec, null, null, null,
|
||||
null, null, null, null, null, null, null, null, null, null
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordinator.compact;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DataSourceCompactibleSegmentIteratorTest
|
||||
{
|
||||
@Test
|
||||
public void testFilterSkipIntervals()
|
||||
{
|
||||
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
|
||||
final List<Interval> expectedSkipIntervals = ImmutableList.of(
|
||||
Intervals.of("2018-01-15/2018-03-02"),
|
||||
Intervals.of("2018-07-23/2018-10-01"),
|
||||
Intervals.of("2018-10-02/2018-12-25"),
|
||||
Intervals.of("2018-12-31/2019-01-01")
|
||||
);
|
||||
final List<Interval> skipIntervals = DataSourceCompactibleSegmentIterator.filterSkipIntervals(
|
||||
totalInterval,
|
||||
Lists.newArrayList(
|
||||
Intervals.of("2017-12-01/2018-01-15"),
|
||||
Intervals.of("2018-03-02/2018-07-23"),
|
||||
Intervals.of("2018-10-01/2018-10-02"),
|
||||
Intervals.of("2018-12-25/2018-12-31")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSkipIntervalFromLatestAndSort()
|
||||
{
|
||||
final List<Interval> expectedIntervals = ImmutableList.of(
|
||||
Intervals.of("2018-12-24/2018-12-25"),
|
||||
Intervals.of("2018-12-29/2019-01-01")
|
||||
);
|
||||
final List<Interval> fullSkipIntervals = DataSourceCompactibleSegmentIterator.sortAndAddSkipIntervalFromLatest(
|
||||
DateTimes.of("2019-01-01"),
|
||||
new Period(72, 0, 0, 0),
|
||||
null,
|
||||
ImmutableList.of(
|
||||
Intervals.of("2018-12-30/2018-12-31"),
|
||||
Intervals.of("2018-12-24/2018-12-25")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
|
||||
}
|
||||
}
|
|
@ -1,477 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.server.coordinator.compact;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class NewestSegmentFirstIteratorTest
|
||||
{
|
||||
@Test
|
||||
public void testFilterSkipIntervals()
|
||||
{
|
||||
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
|
||||
final List<Interval> expectedSkipIntervals = ImmutableList.of(
|
||||
Intervals.of("2018-01-15/2018-03-02"),
|
||||
Intervals.of("2018-07-23/2018-10-01"),
|
||||
Intervals.of("2018-10-02/2018-12-25"),
|
||||
Intervals.of("2018-12-31/2019-01-01")
|
||||
);
|
||||
final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
|
||||
totalInterval,
|
||||
Lists.newArrayList(
|
||||
Intervals.of("2017-12-01/2018-01-15"),
|
||||
Intervals.of("2018-03-02/2018-07-23"),
|
||||
Intervals.of("2018-10-01/2018-10-02"),
|
||||
Intervals.of("2018-12-25/2018-12-31")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSkipIntervalFromLatestAndSort()
|
||||
{
|
||||
final List<Interval> expectedIntervals = ImmutableList.of(
|
||||
Intervals.of("2018-12-24/2018-12-25"),
|
||||
Intervals.of("2018-12-29/2019-01-01")
|
||||
);
|
||||
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
|
||||
DateTimes.of("2019-01-01"),
|
||||
new Period(72, 0, 0, 0),
|
||||
null,
|
||||
ImmutableList.of(
|
||||
Intervals.of("2018-12-30/2018-12-31"),
|
||||
Intervals.of("2018-12-24/2018-12-25")
|
||||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPartitinosSpecWithMaxTotalRowsOfLongMax()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxValue()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(null, null),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenValue()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(null, 1000L),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, 1000L),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGivenValue()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, 1000L),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(100, 1000L),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
100,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000L,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(100, 1000L),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPartitionsSpecIgnoreDeprecatedOne()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
100,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(null, null),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartitionsSpecIgnoreDeprecatedOne()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000L,
|
||||
null,
|
||||
new DynamicPartitionsSpec(null, null),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec()
|
||||
{
|
||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||
"datasource",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new UserCompactionTaskQueryTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
Assert.assertEquals(
|
||||
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
|
||||
CompactionStatus.findPartitionsSpecFromConfig(
|
||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -88,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testLargeOffsetAndSmallSegmentInterval()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -113,7 +113,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testSmallOffsetAndLargeSegmentInterval()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -146,7 +146,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testLargeGapInData()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -179,7 +179,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testHugeShard()
|
||||
{
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -229,7 +229,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testManySegmentsPerShard()
|
||||
{
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -287,7 +287,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
{
|
||||
final String unknownDataSource = "unknown";
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(
|
||||
unknownDataSource,
|
||||
createCompactionConfig(10000, new Period("P2D"), null),
|
||||
|
@ -337,7 +337,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
2
|
||||
)
|
||||
);
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -374,7 +374,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
)
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
)
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -412,7 +412,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -445,7 +445,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -471,7 +471,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -496,7 +496,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testWithSkipIntervals()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -536,7 +536,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testHoleInSearchInterval()
|
||||
{
|
||||
final Period segmentPeriod = new Period("PT1H");
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
|
||||
ImmutableMap.of(
|
||||
DATA_SOURCE,
|
||||
|
@ -586,7 +586,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -635,7 +635,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -696,7 +696,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -740,7 +740,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets segmentGranularity=DAY
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -773,7 +773,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets segmentGranularity=DAY
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -787,7 +787,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -806,7 +806,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets segmentGranularity=YEAR
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -830,7 +830,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -849,7 +849,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets segmentGranularity=YEAR
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -873,7 +873,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -887,7 +887,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
|
||||
// Duration of new segmentGranularity is the same as before (P1D),
|
||||
// but we changed the timezone from UTC to Bangkok in the auto compaction spec
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(
|
||||
130000,
|
||||
|
@ -925,7 +925,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -938,7 +938,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(
|
||||
130000,
|
||||
|
@ -976,7 +976,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have
|
||||
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||
|
@ -1004,7 +1004,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets rollup=true
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -1036,7 +1036,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have
|
||||
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||
|
@ -1064,7 +1064,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets queryGranularity=MINUTE
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -1096,7 +1096,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have
|
||||
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||
|
@ -1131,7 +1131,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets Dimensions=["foo"]
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1172,7 +1172,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
Assert.assertFalse(iterator.hasNext());
|
||||
|
||||
// Auto compaction config sets Dimensions=null
|
||||
iterator = policy.reset(
|
||||
iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1195,7 +1195,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have
|
||||
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||
|
@ -1251,7 +1251,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null)
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1292,7 +1292,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
Assert.assertFalse(iterator.hasNext());
|
||||
|
||||
// Auto compaction config sets filter=null
|
||||
iterator = policy.reset(
|
||||
iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1319,7 +1319,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Same indexSpec as what is set in the auto compaction config
|
||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||
// Same partitionsSpec as what is set in the auto compaction config
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have
|
||||
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||
|
@ -1375,7 +1375,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")}
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1416,7 +1416,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
Assert.assertFalse(iterator.hasNext());
|
||||
|
||||
// Auto compaction config sets metricsSpec=null
|
||||
iterator = policy.reset(
|
||||
iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1440,7 +1440,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
||||
);
|
||||
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
|
||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||
Collections.emptyMap()
|
||||
|
@ -1468,7 +1468,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
// Different indexSpec as what is set in the auto compaction config
|
||||
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
|
||||
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
|
||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
|
@ -1481,7 +1481,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
|
||||
// Duration of new segmentGranularity is the same as before (P1D)
|
||||
final CompactionSegmentIterator iterator = policy.reset(
|
||||
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(
|
||||
130000,
|
||||
|
@ -1517,7 +1517,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
|
||||
{
|
||||
NullHandling.initializeForTests();
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||
final SegmentTimeline timeline = createTimeline(
|
||||
new SegmentGenerateSpec(
|
||||
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
|
||||
|
@ -1534,7 +1534,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
)
|
||||
);
|
||||
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1569,7 +1569,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
);
|
||||
Assert.assertFalse(iterator.hasNext());
|
||||
|
||||
iterator = policy.reset(
|
||||
iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||
130000,
|
||||
new Period("P0D"),
|
||||
|
@ -1608,7 +1608,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testSkipAllGranularityToDefault()
|
||||
{
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1640,7 +1640,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testSkipFirstHalfEternityToDefault()
|
||||
{
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1672,7 +1672,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testSkipSecondHalfOfEternityToDefault()
|
||||
{
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1704,7 +1704,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testSkipAllToAllGranularity()
|
||||
{
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1736,7 +1736,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
@Test
|
||||
public void testSkipAllToFinerGranularity()
|
||||
{
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1799,7 +1799,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
0,
|
||||
1);
|
||||
|
||||
CompactionSegmentIterator iterator = policy.reset(
|
||||
CompactionSegmentIterator iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
@ -1850,7 +1850,7 @@ public class NewestSegmentFirstPolicyTest
|
|||
TombstoneShardSpec.INSTANCE,
|
||||
0,
|
||||
1);
|
||||
iterator = policy.reset(
|
||||
iterator = policy.createIterator(
|
||||
ImmutableMap.of(DATA_SOURCE,
|
||||
createCompactionConfig(10000,
|
||||
new Period("P0D"),
|
||||
|
|
Loading…
Reference in New Issue