mirror of https://github.com/apache/druid.git
Allow CompactionSegmentIterator to have custom priority (#16737)
Changes: - Break `NewestSegmentFirstIterator` into two parts - `DatasourceCompactibleSegmentIterator` - this contains all the code from `NewestSegmentFirstIterator` but now handles a single datasource and allows a priority to be specified - `PriorityBasedCompactionSegmentIterator` - contains separate iterator for each datasource and combines the results into a single queue to be used by a compaction search policy - Update `NewestSegmentFirstPolicy` to use the above new classes - Cleanup `CompactionStatistics` and `AutoCompactionSnapshot` - Cleanup `CompactSegments` - Remove unused methods from `Tasks` - Remove unneeded `TasksTest` - Move tests from `NewestSegmentFirstIteratorTest` to `CompactionStatusTest` and `DatasourceCompactibleSegmentIteratorTest`
This commit is contained in:
parent
6cf6838eb9
commit
01d67ae543
|
@ -141,7 +141,7 @@ public class NewestSegmentFirstPolicyBenchmark
|
||||||
@Benchmark
|
@Benchmark
|
||||||
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
|
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
|
||||||
{
|
{
|
||||||
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
|
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
|
||||||
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
|
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
|
||||||
blackhole.consume(iterator.next());
|
blackhole.consume(iterator.next());
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,16 +21,9 @@ package org.apache.druid.indexing.common.task;
|
||||||
|
|
||||||
import org.apache.curator.shaded.com.google.common.base.Verify;
|
import org.apache.curator.shaded.com.google.common.base.Verify;
|
||||||
import org.apache.druid.indexing.common.TaskLockType;
|
import org.apache.druid.indexing.common.TaskLockType;
|
||||||
import org.apache.druid.java.util.common.JodaUtils;
|
|
||||||
import org.apache.druid.java.util.common.guava.Comparators;
|
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.server.coordinator.duty.CompactSegments;
|
import org.apache.druid.server.coordinator.duty.CompactSegments;
|
||||||
import org.joda.time.Interval;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class Tasks
|
public class Tasks
|
||||||
|
@ -63,44 +56,19 @@ public class Tasks
|
||||||
* Context flag denoting if maximum possible values should be used to estimate
|
* Context flag denoting if maximum possible values should be used to estimate
|
||||||
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
|
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
|
||||||
* more details.
|
* more details.
|
||||||
*
|
* <p>
|
||||||
* The value of this flag is true by default which corresponds to the old method
|
* The value of this flag is true by default which corresponds to the old method
|
||||||
* of estimation.
|
* of estimation.
|
||||||
*/
|
*/
|
||||||
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";
|
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This context is used in compaction. When it is set in the context, the segments created by the task
|
* Context flag to denote if segments published to metadata by a task should
|
||||||
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
|
* have the {@code lastCompactionState} field set.
|
||||||
* See {@link org.apache.druid.timeline.DataSegment} and {@link
|
|
||||||
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
|
|
||||||
*/
|
*/
|
||||||
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
|
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
|
Verify.verify(STORE_COMPACTION_STATE_KEY.equals(CompactSegments.STORE_COMPACTION_STATE_KEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SortedSet<Interval> computeCondensedIntervals(SortedSet<Interval> intervals)
|
|
||||||
{
|
|
||||||
final SortedSet<Interval> condensedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
|
||||||
List<Interval> toBeAccumulated = new ArrayList<>();
|
|
||||||
for (Interval interval : intervals) {
|
|
||||||
if (toBeAccumulated.size() == 0) {
|
|
||||||
toBeAccumulated.add(interval);
|
|
||||||
} else {
|
|
||||||
if (toBeAccumulated.get(toBeAccumulated.size() - 1).abuts(interval)) {
|
|
||||||
toBeAccumulated.add(interval);
|
|
||||||
} else {
|
|
||||||
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
|
|
||||||
toBeAccumulated.clear();
|
|
||||||
toBeAccumulated.add(interval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (toBeAccumulated.size() > 0) {
|
|
||||||
condensedIntervals.add(JodaUtils.umbrellaInterval(toBeAccumulated));
|
|
||||||
}
|
|
||||||
return condensedIntervals;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,94 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.indexing.common.task;
|
|
||||||
|
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
|
||||||
import org.apache.druid.java.util.common.guava.Comparators;
|
|
||||||
import org.joda.time.Interval;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
|
|
||||||
public class TasksTest
|
|
||||||
{
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testComputeCondensedIntervals()
|
|
||||||
{
|
|
||||||
final SortedSet<Interval> inputIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
|
||||||
for (int m = 1; m < 13; m++) {
|
|
||||||
for (int d = 1; d < 10; d++) {
|
|
||||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int d = 12; d < 20; d++) {
|
|
||||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
inputIntervals.add(getInterval(m, 22, m, 23));
|
|
||||||
|
|
||||||
for (int d = 25; d < 28; d++) {
|
|
||||||
inputIntervals.add(getInterval(m, d, m, d + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m == 1 || m == 3 || m == 5 || m == 7 || m == 8 || m == 10) {
|
|
||||||
inputIntervals.add(getInterval(m, 31, m + 1, 1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
inputIntervals.add(Intervals.of("2017-12-31/2018-01-01"));
|
|
||||||
|
|
||||||
final SortedSet<Interval> condensedIntervals = Tasks.computeCondensedIntervals(inputIntervals);
|
|
||||||
final Iterator<Interval> condensedIntervalIterator = condensedIntervals.iterator();
|
|
||||||
Assert.assertTrue(condensedIntervalIterator.hasNext());
|
|
||||||
|
|
||||||
Interval condensedInterval = condensedIntervalIterator.next();
|
|
||||||
final SortedSet<Interval> checkedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
|
|
||||||
for (Interval inputInterval : inputIntervals) {
|
|
||||||
if (!condensedInterval.contains(inputInterval)) {
|
|
||||||
if (condensedIntervalIterator.hasNext()) {
|
|
||||||
condensedInterval = condensedIntervalIterator.next();
|
|
||||||
Assert.assertTrue(condensedInterval.contains(inputInterval));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
checkedIntervals.add(inputInterval);
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert.assertFalse(condensedIntervalIterator.hasNext());
|
|
||||||
Assert.assertEquals(inputIntervals, checkedIntervals);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Interval getInterval(int startMonth, int startDay, int endMonth, int endDay)
|
|
||||||
{
|
|
||||||
return Intervals.of(
|
|
||||||
StringUtils.format(
|
|
||||||
"2017-%02d-%02d/2017-%02d-%02d",
|
|
||||||
startMonth,
|
|
||||||
startDay,
|
|
||||||
endMonth,
|
|
||||||
endDay
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
|
||||||
import org.apache.druid.segment.incremental.AppendableIndexSpec;
|
import org.apache.druid.segment.incremental.AppendableIndexSpec;
|
||||||
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
|
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
|
||||||
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
|
@ -79,6 +80,17 @@ public class ClientCompactionTaskQueryTuningConfig
|
||||||
@Nullable
|
@Nullable
|
||||||
private final AppendableIndexSpec appendableIndexSpec;
|
private final AppendableIndexSpec appendableIndexSpec;
|
||||||
|
|
||||||
|
public static ClientCompactionTaskQueryTuningConfig from(
|
||||||
|
DataSourceCompactionConfig compactionConfig
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (compactionConfig == null) {
|
||||||
|
return from(null, null, null);
|
||||||
|
} else {
|
||||||
|
return from(compactionConfig.getTuningConfig(), compactionConfig.getMaxRowsPerSegment(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static ClientCompactionTaskQueryTuningConfig from(
|
public static ClientCompactionTaskQueryTuningConfig from(
|
||||||
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
|
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
|
||||||
@Nullable Integer maxRowsPerSegment,
|
@Nullable Integer maxRowsPerSegment,
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
|
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
||||||
|
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -193,15 +194,9 @@ public class AutoCompactionSnapshot
|
||||||
private final String dataSource;
|
private final String dataSource;
|
||||||
private final AutoCompactionScheduleStatus scheduleStatus;
|
private final AutoCompactionScheduleStatus scheduleStatus;
|
||||||
|
|
||||||
private long bytesAwaitingCompaction;
|
private final CompactionStatistics compactedStats = new CompactionStatistics();
|
||||||
private long bytesCompacted;
|
private final CompactionStatistics skippedStats = new CompactionStatistics();
|
||||||
private long bytesSkipped;
|
private final CompactionStatistics waitingStats = new CompactionStatistics();
|
||||||
private long segmentCountAwaitingCompaction;
|
|
||||||
private long segmentCountCompacted;
|
|
||||||
private long segmentCountSkipped;
|
|
||||||
private long intervalCountAwaitingCompaction;
|
|
||||||
private long intervalCountCompacted;
|
|
||||||
private long intervalCountSkipped;
|
|
||||||
|
|
||||||
private Builder(
|
private Builder(
|
||||||
@NotNull String dataSource,
|
@NotNull String dataSource,
|
||||||
|
@ -217,69 +212,21 @@ public class AutoCompactionSnapshot
|
||||||
|
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
this.scheduleStatus = scheduleStatus;
|
this.scheduleStatus = scheduleStatus;
|
||||||
this.bytesAwaitingCompaction = 0;
|
|
||||||
this.bytesCompacted = 0;
|
|
||||||
this.bytesSkipped = 0;
|
|
||||||
this.segmentCountAwaitingCompaction = 0;
|
|
||||||
this.segmentCountCompacted = 0;
|
|
||||||
this.segmentCountSkipped = 0;
|
|
||||||
this.intervalCountAwaitingCompaction = 0;
|
|
||||||
this.intervalCountCompacted = 0;
|
|
||||||
this.intervalCountSkipped = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder incrementBytesAwaitingCompaction(long incrementValue)
|
public void incrementWaitingStats(CompactionStatistics entry)
|
||||||
{
|
{
|
||||||
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
|
waitingStats.increment(entry);
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder incrementBytesCompacted(long incrementValue)
|
public void incrementCompactedStats(CompactionStatistics entry)
|
||||||
{
|
{
|
||||||
this.bytesCompacted = this.bytesCompacted + incrementValue;
|
compactedStats.increment(entry);
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
|
public void incrementSkippedStats(CompactionStatistics entry)
|
||||||
{
|
{
|
||||||
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
|
skippedStats.increment(entry);
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementSegmentCountCompacted(long incrementValue)
|
|
||||||
{
|
|
||||||
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
|
|
||||||
{
|
|
||||||
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementIntervalCountCompacted(long incrementValue)
|
|
||||||
{
|
|
||||||
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementBytesSkipped(long incrementValue)
|
|
||||||
{
|
|
||||||
this.bytesSkipped = this.bytesSkipped + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementSegmentCountSkipped(long incrementValue)
|
|
||||||
{
|
|
||||||
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder incrementIntervalCountSkipped(long incrementValue)
|
|
||||||
{
|
|
||||||
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
|
|
||||||
return this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public AutoCompactionSnapshot build()
|
public AutoCompactionSnapshot build()
|
||||||
|
@ -287,15 +234,15 @@ public class AutoCompactionSnapshot
|
||||||
return new AutoCompactionSnapshot(
|
return new AutoCompactionSnapshot(
|
||||||
dataSource,
|
dataSource,
|
||||||
scheduleStatus,
|
scheduleStatus,
|
||||||
bytesAwaitingCompaction,
|
waitingStats.getTotalBytes(),
|
||||||
bytesCompacted,
|
compactedStats.getTotalBytes(),
|
||||||
bytesSkipped,
|
skippedStats.getTotalBytes(),
|
||||||
segmentCountAwaitingCompaction,
|
waitingStats.getNumSegments(),
|
||||||
segmentCountCompacted,
|
compactedStats.getNumSegments(),
|
||||||
segmentCountSkipped,
|
skippedStats.getNumSegments(),
|
||||||
intervalCountAwaitingCompaction,
|
waitingStats.getNumIntervals(),
|
||||||
intervalCountCompacted,
|
compactedStats.getNumIntervals(),
|
||||||
intervalCountSkipped
|
skippedStats.getNumIntervals()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,9 +33,9 @@ import java.util.Map;
|
||||||
public interface CompactionSegmentSearchPolicy
|
public interface CompactionSegmentSearchPolicy
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Reset the current states of this policy. This method should be called whenever iterating starts.
|
* Creates an iterator that returns compactible segments.
|
||||||
*/
|
*/
|
||||||
CompactionSegmentIterator reset(
|
CompactionSegmentIterator createIterator(
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
Map<String, SegmentTimeline> dataSources,
|
Map<String, SegmentTimeline> dataSources,
|
||||||
Map<String, List<Interval>> skipIntervals
|
Map<String, List<Interval>> skipIntervals
|
||||||
|
|
|
@ -28,9 +28,13 @@ public class CompactionStatistics
|
||||||
private long numSegments;
|
private long numSegments;
|
||||||
private long numIntervals;
|
private long numIntervals;
|
||||||
|
|
||||||
public static CompactionStatistics create()
|
public static CompactionStatistics create(long bytes, long numSegments, long numIntervals)
|
||||||
{
|
{
|
||||||
return new CompactionStatistics();
|
final CompactionStatistics stats = new CompactionStatistics();
|
||||||
|
stats.totalBytes = bytes;
|
||||||
|
stats.numIntervals = numIntervals;
|
||||||
|
stats.numSegments = numSegments;
|
||||||
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTotalBytes()
|
public long getTotalBytes()
|
||||||
|
@ -48,10 +52,10 @@ public class CompactionStatistics
|
||||||
return numIntervals;
|
return numIntervals;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addFrom(SegmentsToCompact segments)
|
public void increment(CompactionStatistics other)
|
||||||
{
|
{
|
||||||
totalBytes += segments.getTotalBytes();
|
totalBytes += other.getTotalBytes();
|
||||||
numIntervals += segments.getNumIntervals();
|
numIntervals += other.getNumIntervals();
|
||||||
numSegments += segments.size();
|
numSegments += other.getNumSegments();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,12 +167,7 @@ public class CompactionStatus
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
|
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
|
||||||
this.compactionConfig = compactionConfig;
|
this.compactionConfig = compactionConfig;
|
||||||
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
|
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
|
||||||
compactionConfig.getTuningConfig(),
|
|
||||||
compactionConfig.getMaxRowsPerSegment(),
|
|
||||||
null
|
|
||||||
);
|
|
||||||
|
|
||||||
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
|
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
|
||||||
if (lastCompactionState == null) {
|
if (lastCompactionState == null) {
|
||||||
this.existingGranularitySpec = null;
|
this.existingGranularitySpec = null;
|
||||||
|
|
|
@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
import org.apache.druid.java.util.common.JodaUtils;
|
import org.apache.druid.java.util.common.JodaUtils;
|
||||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||||
|
@ -59,44 +57,46 @@ import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class iterates all segments of the dataSources configured for compaction from the newest to the oldest.
|
* Iterator over compactible segments of a datasource in order of specified priority.
|
||||||
*/
|
*/
|
||||||
public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
public class DataSourceCompactibleSegmentIterator implements Iterator<SegmentsToCompact>
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(NewestSegmentFirstIterator.class);
|
private static final Logger log = new Logger(DataSourceCompactibleSegmentIterator.class);
|
||||||
|
|
||||||
|
private final String dataSource;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
|
private final DataSourceCompactionConfig config;
|
||||||
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>();
|
private final CompactionStatistics compactedSegmentStats = new CompactionStatistics();
|
||||||
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>();
|
private final CompactionStatistics skippedSegmentStats = new CompactionStatistics();
|
||||||
|
|
||||||
private final Map<String, CompactibleSegmentIterator> timelineIterators;
|
|
||||||
|
|
||||||
// This is needed for datasource that has segmentGranularity configured
|
// This is needed for datasource that has segmentGranularity configured
|
||||||
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
|
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
|
||||||
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
|
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
|
||||||
// run of the compaction job and skip any interval that was already previously compacted.
|
// run of the compaction job and skip any interval that was already previously compacted.
|
||||||
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>();
|
private final Set<Interval> compactedIntervals = new HashSet<>();
|
||||||
|
|
||||||
private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
|
private final PriorityQueue<SegmentsToCompact> queue;
|
||||||
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval())
|
|
||||||
);
|
|
||||||
|
|
||||||
NewestSegmentFirstIterator(
|
public DataSourceCompactibleSegmentIterator(
|
||||||
ObjectMapper objectMapper,
|
DataSourceCompactionConfig config,
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
SegmentTimeline timeline,
|
||||||
Map<String, SegmentTimeline> dataSources,
|
List<Interval> skipIntervals,
|
||||||
Map<String, List<Interval>> skipIntervals
|
Comparator<SegmentsToCompact> segmentPriority,
|
||||||
|
ObjectMapper objectMapper
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
this.compactionConfigs = compactionConfigs;
|
this.config = config;
|
||||||
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
|
this.dataSource = config.getDataSource();
|
||||||
|
this.queue = new PriorityQueue<>(segmentPriority);
|
||||||
|
populateQueue(timeline, skipIntervals);
|
||||||
|
}
|
||||||
|
|
||||||
dataSources.forEach((dataSource, timeline) -> {
|
private void populateQueue(SegmentTimeline timeline, List<Interval> skipIntervals)
|
||||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
|
{
|
||||||
|
if (timeline != null) {
|
||||||
Granularity configuredSegmentGranularity = null;
|
Granularity configuredSegmentGranularity = null;
|
||||||
if (config != null && !timeline.isEmpty()) {
|
if (!timeline.isEmpty()) {
|
||||||
SegmentTimeline originalTimeline = null;
|
SegmentTimeline originalTimeline = null;
|
||||||
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
|
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||||
String temporaryVersion = DateTimes.nowUtc().toString();
|
String temporaryVersion = DateTimes.nowUtc().toString();
|
||||||
|
@ -154,33 +154,25 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
timeline,
|
timeline,
|
||||||
config.getSkipOffsetFromLatest(),
|
config.getSkipOffsetFromLatest(),
|
||||||
configuredSegmentGranularity,
|
configuredSegmentGranularity,
|
||||||
skipIntervals.get(dataSource)
|
skipIntervals
|
||||||
);
|
);
|
||||||
if (!searchIntervals.isEmpty()) {
|
if (!searchIntervals.isEmpty()) {
|
||||||
timelineIterators.put(
|
findAndEnqueueSegmentsToCompact(
|
||||||
dataSource,
|
|
||||||
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
|
new CompactibleSegmentIterator(timeline, searchIntervals, originalTimeline)
|
||||||
);
|
);
|
||||||
|
} else {
|
||||||
|
log.warn("Skipping compaction for datasource[%s] as it has no compactible segments.", dataSource);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
compactionConfigs.forEach((dataSourceName, config) -> {
|
|
||||||
if (config == null) {
|
|
||||||
throw new ISE("Unknown dataSource[%s]", dataSourceName);
|
|
||||||
}
|
}
|
||||||
updateQueue(dataSourceName, config);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public CompactionStatistics totalCompactedStatistics()
|
||||||
public Map<String, CompactionStatistics> totalCompactedStatistics()
|
|
||||||
{
|
{
|
||||||
return compactedSegmentStats;
|
return compactedSegmentStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public CompactionStatistics totalSkippedStatistics()
|
||||||
public Map<String, CompactionStatistics> totalSkippedStatistics()
|
|
||||||
{
|
{
|
||||||
return skippedSegmentStats;
|
return skippedSegmentStats;
|
||||||
}
|
}
|
||||||
|
@ -206,25 +198,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
final List<DataSegment> resultSegments = entry.getSegments();
|
final List<DataSegment> resultSegments = entry.getSegments();
|
||||||
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
|
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
|
||||||
|
|
||||||
final String dataSource = resultSegments.get(0).getDataSource();
|
|
||||||
updateQueue(dataSource, compactionConfigs.get(dataSource));
|
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Find the next segments to compact for the given dataSource and add them to the queue.
|
|
||||||
* {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from
|
|
||||||
* the timeline of the given dataSource.
|
|
||||||
*/
|
|
||||||
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
|
|
||||||
{
|
|
||||||
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
|
|
||||||
if (!segmentsToCompact.isEmpty()) {
|
|
||||||
queue.add(segmentsToCompact);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterates compactible segments in a {@link SegmentTimeline}.
|
* Iterates compactible segments in a {@link SegmentTimeline}.
|
||||||
*/
|
*/
|
||||||
|
@ -315,27 +291,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds segments to compact together for the given datasource.
|
* Finds segments to compact together for the given datasource and adds them to
|
||||||
*
|
* the priority queue.
|
||||||
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
|
|
||||||
*/
|
*/
|
||||||
private SegmentsToCompact findSegmentsToCompact(
|
private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compactibleSegmentIterator)
|
||||||
final String dataSourceName,
|
|
||||||
final DataSourceCompactionConfig config
|
|
||||||
)
|
|
||||||
{
|
{
|
||||||
final CompactibleSegmentIterator compactibleSegmentIterator
|
|
||||||
= timelineIterators.get(dataSourceName);
|
|
||||||
if (compactibleSegmentIterator == null) {
|
|
||||||
log.warn(
|
|
||||||
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
|
|
||||||
dataSourceName
|
|
||||||
);
|
|
||||||
return SegmentsToCompact.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
final long inputSegmentSize = config.getInputSegmentSizeBytes();
|
final long inputSegmentSize = config.getInputSegmentSizeBytes();
|
||||||
|
|
||||||
while (compactibleSegmentIterator.hasNext()) {
|
while (compactibleSegmentIterator.hasNext()) {
|
||||||
List<DataSegment> segments = compactibleSegmentIterator.next();
|
List<DataSegment> segments = compactibleSegmentIterator.next();
|
||||||
|
|
||||||
|
@ -352,47 +313,33 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
if (!compactionStatus.isComplete()) {
|
if (!compactionStatus.isComplete()) {
|
||||||
log.debug(
|
log.debug(
|
||||||
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
|
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
|
||||||
dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact()
|
dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (compactionStatus.isComplete()) {
|
if (compactionStatus.isComplete()) {
|
||||||
addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
|
compactedSegmentStats.increment(candidates.getStats());
|
||||||
} else if (candidates.getTotalBytes() > inputSegmentSize) {
|
} else if (candidates.getTotalBytes() > inputSegmentSize) {
|
||||||
addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
|
skippedSegmentStats.increment(candidates.getStats());
|
||||||
log.warn(
|
log.warn(
|
||||||
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
|
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
|
||||||
+ " is larger than allowed inputSegmentSize[%d].",
|
+ " is larger than allowed inputSegmentSize[%d].",
|
||||||
dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize
|
dataSource, interval, candidates.getTotalBytes(), inputSegmentSize
|
||||||
);
|
);
|
||||||
} else if (config.getGranularitySpec() != null
|
} else if (config.getGranularitySpec() != null
|
||||||
&& config.getGranularitySpec().getSegmentGranularity() != null) {
|
&& config.getGranularitySpec().getSegmentGranularity() != null) {
|
||||||
Set<Interval> compactedIntervals = intervalCompactedForDatasource
|
|
||||||
.computeIfAbsent(dataSourceName, k -> new HashSet<>());
|
|
||||||
|
|
||||||
if (compactedIntervals.contains(interval)) {
|
if (compactedIntervals.contains(interval)) {
|
||||||
// Skip these candidate segments as we have already compacted this interval
|
// Skip these candidate segments as we have already compacted this interval
|
||||||
} else {
|
} else {
|
||||||
compactedIntervals.add(interval);
|
compactedIntervals.add(interval);
|
||||||
return candidates;
|
queue.add(candidates);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return candidates;
|
queue.add(candidates);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("No more segments to compact for datasource[%s].", dataSourceName);
|
log.debug("No more segments to compact for datasource[%s].", dataSource);
|
||||||
return SegmentsToCompact.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addSegmentStatsTo(
|
|
||||||
Map<String, CompactionStatistics> statisticsMap,
|
|
||||||
String dataSourceName,
|
|
||||||
SegmentsToCompact segments
|
|
||||||
)
|
|
||||||
{
|
|
||||||
statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create())
|
|
||||||
.addFrom(segments);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -428,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
|
||||||
final List<DataSegment> segments = new ArrayList<>(
|
final List<DataSegment> segments = new ArrayList<>(
|
||||||
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
|
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
|
||||||
);
|
);
|
||||||
addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments));
|
skippedSegmentStats.increment(SegmentsToCompact.from(segments).getStats());
|
||||||
}
|
}
|
||||||
|
|
||||||
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
|
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
|
|
@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.compact;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.java.util.common.guava.Comparators;
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.timeline.SegmentTimeline;
|
import org.apache.druid.timeline.SegmentTimeline;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -29,7 +30,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This policy searches segments for compaction from the newest one to oldest one.
|
* This policy searches segments for compaction from newest to oldest.
|
||||||
*/
|
*/
|
||||||
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
||||||
{
|
{
|
||||||
|
@ -42,12 +43,20 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompactionSegmentIterator reset(
|
public CompactionSegmentIterator createIterator(
|
||||||
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
Map<String, SegmentTimeline> dataSources,
|
Map<String, SegmentTimeline> dataSources,
|
||||||
Map<String, List<Interval>> skipIntervals
|
Map<String, List<Interval>> skipIntervals
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
|
return new PriorityBasedCompactionSegmentIterator(
|
||||||
|
compactionConfigs,
|
||||||
|
dataSources,
|
||||||
|
skipIntervals,
|
||||||
|
(o1, o2) -> Comparators.intervalsByStartThenEnd()
|
||||||
|
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
|
||||||
|
objectMapper
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.coordinator.compact;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.druid.error.DruidException;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
|
import org.apache.druid.timeline.SegmentTimeline;
|
||||||
|
import org.apache.druid.utils.CollectionUtils;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of {@link CompactionSegmentIterator} that returns segments in
|
||||||
|
* order of their priority.
|
||||||
|
*/
|
||||||
|
public class PriorityBasedCompactionSegmentIterator implements CompactionSegmentIterator
|
||||||
|
{
|
||||||
|
private static final Logger log = new Logger(PriorityBasedCompactionSegmentIterator.class);
|
||||||
|
|
||||||
|
private final PriorityQueue<SegmentsToCompact> queue;
|
||||||
|
private final Map<String, DataSourceCompactibleSegmentIterator> datasourceIterators;
|
||||||
|
|
||||||
|
public PriorityBasedCompactionSegmentIterator(
|
||||||
|
Map<String, DataSourceCompactionConfig> compactionConfigs,
|
||||||
|
Map<String, SegmentTimeline> datasourceToTimeline,
|
||||||
|
Map<String, List<Interval>> skipIntervals,
|
||||||
|
Comparator<SegmentsToCompact> segmentPriority,
|
||||||
|
ObjectMapper objectMapper
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.queue = new PriorityQueue<>(segmentPriority);
|
||||||
|
this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
|
||||||
|
compactionConfigs.forEach((datasource, config) -> {
|
||||||
|
if (config == null) {
|
||||||
|
throw DruidException.defensive("Invalid null compaction config for dataSource[%s].", datasource);
|
||||||
|
}
|
||||||
|
final SegmentTimeline timeline = datasourceToTimeline.get(datasource);
|
||||||
|
if (timeline == null) {
|
||||||
|
log.warn("Skipping compaction for datasource[%s] as it has no timeline.", datasource);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
datasourceIterators.put(
|
||||||
|
datasource,
|
||||||
|
new DataSourceCompactibleSegmentIterator(
|
||||||
|
compactionConfigs.get(datasource),
|
||||||
|
timeline,
|
||||||
|
skipIntervals.getOrDefault(datasource, Collections.emptyList()),
|
||||||
|
segmentPriority,
|
||||||
|
objectMapper
|
||||||
|
)
|
||||||
|
);
|
||||||
|
addNextItemForDatasourceToQueue(datasource);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, CompactionStatistics> totalCompactedStatistics()
|
||||||
|
{
|
||||||
|
return CollectionUtils.mapValues(
|
||||||
|
datasourceIterators,
|
||||||
|
DataSourceCompactibleSegmentIterator::totalCompactedStatistics
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, CompactionStatistics> totalSkippedStatistics()
|
||||||
|
{
|
||||||
|
return CollectionUtils.mapValues(
|
||||||
|
datasourceIterators,
|
||||||
|
DataSourceCompactibleSegmentIterator::totalSkippedStatistics
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return !queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentsToCompact next()
|
||||||
|
{
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
|
||||||
|
final SegmentsToCompact entry = queue.poll();
|
||||||
|
if (entry == null) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
Preconditions.checkState(!entry.isEmpty(), "Queue entry must not be empty");
|
||||||
|
|
||||||
|
addNextItemForDatasourceToQueue(entry.getFirst().getDataSource());
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNextItemForDatasourceToQueue(String dataSourceName)
|
||||||
|
{
|
||||||
|
final DataSourceCompactibleSegmentIterator iterator = datasourceIterators.get(dataSourceName);
|
||||||
|
if (iterator.hasNext()) {
|
||||||
|
final SegmentsToCompact segmentsToCompact = iterator.next();
|
||||||
|
if (!segmentsToCompact.isEmpty()) {
|
||||||
|
queue.add(segmentsToCompact);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -107,9 +107,9 @@ public class SegmentsToCompact
|
||||||
return umbrellaInterval;
|
return umbrellaInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNumIntervals()
|
public CompactionStatistics getStats()
|
||||||
{
|
{
|
||||||
return numIntervals;
|
return CompactionStatistics.create(totalBytes, size(), numIntervals);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||||
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
|
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
|
||||||
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
|
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
|
||||||
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
|
||||||
import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
|
import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
|
||||||
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
|
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
|
||||||
import org.apache.druid.server.coordinator.stats.Dimension;
|
import org.apache.druid.server.coordinator.stats.Dimension;
|
||||||
|
@ -87,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
|
|
||||||
private static final Logger LOG = new Logger(CompactSegments.class);
|
private static final Logger LOG = new Logger(CompactSegments.class);
|
||||||
|
|
||||||
|
private static final String TASK_ID_PREFIX = "coordinator-issued";
|
||||||
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
|
private static final Predicate<TaskStatusPlus> IS_COMPACTION_TASK =
|
||||||
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
|
status -> null != status && COMPACTION_TASK_TYPE.equals(status.getType());
|
||||||
|
|
||||||
|
@ -196,7 +196,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
// Get iterator over segments to compact and submit compaction tasks
|
// Get iterator over segments to compact and submit compaction tasks
|
||||||
Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
|
Map<String, SegmentTimeline> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
|
||||||
final CompactionSegmentIterator iterator =
|
final CompactionSegmentIterator iterator =
|
||||||
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
|
policy.createIterator(compactionConfigs, dataSources, intervalsToSkipCompaction);
|
||||||
|
|
||||||
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
|
final int compactionTaskCapacity = getCompactionTaskCapacity(dynamicConfig);
|
||||||
final int availableCompactionTaskSlots
|
final int availableCompactionTaskSlots
|
||||||
|
@ -215,7 +215,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
|
stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity);
|
||||||
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
|
stats.add(Stats.Compaction.AVAILABLE_SLOTS, availableCompactionTaskSlots);
|
||||||
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
|
stats.add(Stats.Compaction.SUBMITTED_TASKS, numSubmittedCompactionTasks);
|
||||||
addCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
|
updateCompactionSnapshotStats(currentRunAutoCompactionSnapshotBuilders, iterator, stats);
|
||||||
|
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
@ -392,28 +392,19 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
|
|
||||||
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
|
while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) {
|
||||||
final SegmentsToCompact entry = iterator.next();
|
final SegmentsToCompact entry = iterator.next();
|
||||||
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
if (entry.isEmpty()) {
|
||||||
if (segmentsToCompact.isEmpty()) {
|
|
||||||
throw new ISE("segmentsToCompact is empty?");
|
throw new ISE("segmentsToCompact is empty?");
|
||||||
}
|
}
|
||||||
|
|
||||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
final String dataSourceName = entry.getFirst().getDataSource();
|
||||||
|
|
||||||
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
|
// As these segments will be compacted, we will aggregate the statistic to the Compacted statistics
|
||||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
currentRunAutoCompactionSnapshotBuilders
|
||||||
dataSourceName,
|
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
|
||||||
AutoCompactionSnapshot::builder
|
.incrementCompactedStats(entry.getStats());
|
||||||
);
|
|
||||||
snapshotBuilder
|
|
||||||
.incrementBytesCompacted(
|
|
||||||
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
|
|
||||||
)
|
|
||||||
.incrementIntervalCountCompacted(
|
|
||||||
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
|
|
||||||
)
|
|
||||||
.incrementSegmentCountCompacted(segmentsToCompact.size());
|
|
||||||
|
|
||||||
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
|
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
|
||||||
|
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
||||||
|
|
||||||
// Create granularitySpec to send to compaction task
|
// Create granularitySpec to send to compaction task
|
||||||
ClientCompactionTaskGranularitySpec granularitySpec;
|
ClientCompactionTaskGranularitySpec granularitySpec;
|
||||||
|
@ -514,7 +505,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
}
|
}
|
||||||
|
|
||||||
final String taskId = compactSegments(
|
final String taskId = compactSegments(
|
||||||
"coordinator-issued",
|
|
||||||
segmentsToCompact,
|
segmentsToCompact,
|
||||||
config.getTaskPriority(),
|
config.getTaskPriority(),
|
||||||
ClientCompactionTaskQueryTuningConfig.from(
|
ClientCompactionTaskQueryTuningConfig.from(
|
||||||
|
@ -536,7 +526,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
|
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
|
||||||
);
|
);
|
||||||
LOG.debugSegments(segmentsToCompact, "Compacting segments");
|
LOG.debugSegments(segmentsToCompact, "Compacting segments");
|
||||||
// Count the compaction task itself + its sub tasks
|
|
||||||
numSubmittedTasks++;
|
numSubmittedTasks++;
|
||||||
totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
|
totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
|
||||||
}
|
}
|
||||||
|
@ -554,7 +543,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
return newContext;
|
return newContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addCompactionSnapshotStats(
|
private void updateCompactionSnapshotStats(
|
||||||
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
|
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
|
||||||
CompactionSegmentIterator iterator,
|
CompactionSegmentIterator iterator,
|
||||||
CoordinatorRunStats stats
|
CoordinatorRunStats stats
|
||||||
|
@ -563,77 +552,45 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
// Mark all the segments remaining in the iterator as "awaiting compaction"
|
// Mark all the segments remaining in the iterator as "awaiting compaction"
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
final SegmentsToCompact entry = iterator.next();
|
final SegmentsToCompact entry = iterator.next();
|
||||||
final List<DataSegment> segmentsToCompact = entry.getSegments();
|
if (!entry.isEmpty()) {
|
||||||
if (!segmentsToCompact.isEmpty()) {
|
final String dataSourceName = entry.getFirst().getDataSource();
|
||||||
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
|
currentRunAutoCompactionSnapshotBuilders
|
||||||
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
.computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
|
||||||
dataSourceName,
|
.incrementWaitingStats(entry.getStats());
|
||||||
AutoCompactionSnapshot::builder
|
|
||||||
);
|
|
||||||
snapshotBuilder
|
|
||||||
.incrementBytesAwaitingCompaction(
|
|
||||||
segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()
|
|
||||||
)
|
|
||||||
.incrementIntervalCountAwaitingCompaction(
|
|
||||||
segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()
|
|
||||||
)
|
|
||||||
.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Statistics of all segments considered compacted after this run
|
// Statistics of all segments considered compacted after this run
|
||||||
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
|
iterator.totalCompactedStatistics().forEach((dataSource, compactedStats) -> {
|
||||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allCompactedStatistics.entrySet()) {
|
currentRunAutoCompactionSnapshotBuilders
|
||||||
final String dataSource = compactionStatisticsEntry.getKey();
|
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
|
||||||
final CompactionStatistics dataSourceCompactedStatistics = compactionStatisticsEntry.getValue();
|
.incrementCompactedStats(compactedStats);
|
||||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
});
|
||||||
dataSource,
|
|
||||||
AutoCompactionSnapshot::builder
|
|
||||||
);
|
|
||||||
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
|
|
||||||
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
|
|
||||||
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Statistics of all segments considered skipped after this run
|
// Statistics of all segments considered skipped after this run
|
||||||
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
|
iterator.totalSkippedStatistics().forEach((dataSource, dataSourceSkippedStatistics) -> {
|
||||||
for (Map.Entry<String, CompactionStatistics> compactionStatisticsEntry : allSkippedStatistics.entrySet()) {
|
currentRunAutoCompactionSnapshotBuilders
|
||||||
final String dataSource = compactionStatisticsEntry.getKey();
|
.computeIfAbsent(dataSource, AutoCompactionSnapshot::builder)
|
||||||
final CompactionStatistics dataSourceSkippedStatistics = compactionStatisticsEntry.getValue();
|
.incrementSkippedStats(dataSourceSkippedStatistics);
|
||||||
AutoCompactionSnapshot.Builder builder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
|
});
|
||||||
dataSource,
|
|
||||||
AutoCompactionSnapshot::builder
|
|
||||||
);
|
|
||||||
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes())
|
|
||||||
.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments())
|
|
||||||
.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
|
|
||||||
}
|
|
||||||
|
|
||||||
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
|
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
|
||||||
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry
|
currentRunAutoCompactionSnapshotBuilders.forEach((dataSource, builder) -> {
|
||||||
: currentRunAutoCompactionSnapshotBuilders.entrySet()) {
|
final AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
|
||||||
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
|
|
||||||
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
|
|
||||||
|
|
||||||
// Build the complete snapshot for the datasource
|
|
||||||
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
|
|
||||||
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
|
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
|
||||||
|
collectSnapshotStats(autoCompactionSnapshot, stats);
|
||||||
// Use the complete snapshot to emit metrics
|
});
|
||||||
addStatsForDatasource(dataSource, autoCompactionSnapshot, stats);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
|
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
|
||||||
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
|
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addStatsForDatasource(
|
private void collectSnapshotStats(
|
||||||
String dataSource,
|
|
||||||
AutoCompactionSnapshot autoCompactionSnapshot,
|
AutoCompactionSnapshot autoCompactionSnapshot,
|
||||||
CoordinatorRunStats stats
|
CoordinatorRunStats stats
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
|
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, autoCompactionSnapshot.getDataSource());
|
||||||
|
|
||||||
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
|
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
|
||||||
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
|
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
|
||||||
|
@ -668,7 +625,6 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
}
|
}
|
||||||
|
|
||||||
private String compactSegments(
|
private String compactSegments(
|
||||||
String idPrefix,
|
|
||||||
List<DataSegment> segments,
|
List<DataSegment> segments,
|
||||||
int compactionTaskPriority,
|
int compactionTaskPriority,
|
||||||
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
|
||||||
|
@ -692,7 +648,7 @@ public class CompactSegments implements CoordinatorCustomDuty
|
||||||
context = context == null ? new HashMap<>() : context;
|
context = context == null ? new HashMap<>() : context;
|
||||||
context.put("priority", compactionTaskPriority);
|
context.put("priority", compactionTaskPriority);
|
||||||
|
|
||||||
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
|
final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null);
|
||||||
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
|
final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity();
|
||||||
final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
|
final ClientTaskQuery taskPayload = new ClientCompactionTaskQuery(
|
||||||
taskId,
|
taskId,
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.server.coordinator;
|
package org.apache.druid.server.coordinator;
|
||||||
|
|
||||||
|
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -30,17 +31,11 @@ public class AutoCompactionSnapshotTest
|
||||||
final String expectedDataSource = "data";
|
final String expectedDataSource = "data";
|
||||||
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
|
final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource);
|
||||||
|
|
||||||
// Increment every stats twice
|
// Increment every stat twice
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
builder.incrementIntervalCountSkipped(13)
|
builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13));
|
||||||
.incrementBytesSkipped(13)
|
builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13));
|
||||||
.incrementSegmentCountSkipped(13)
|
builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13));
|
||||||
.incrementIntervalCountCompacted(13)
|
|
||||||
.incrementBytesCompacted(13)
|
|
||||||
.incrementSegmentCountCompacted(13)
|
|
||||||
.incrementIntervalCountAwaitingCompaction(13)
|
|
||||||
.incrementBytesAwaitingCompaction(13)
|
|
||||||
.incrementSegmentCountAwaitingCompaction(13);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final AutoCompactionSnapshot actual = builder.build();
|
final AutoCompactionSnapshot actual = builder.build();
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.coordinator.compact;
|
||||||
|
|
||||||
|
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
||||||
|
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
|
||||||
|
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
||||||
|
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||||
|
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||||
|
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
||||||
|
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class CompactionStatusTest
|
||||||
|
{
|
||||||
|
private static final String DS_WIKI = "wiki";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsNull()
|
||||||
|
{
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(null);
|
||||||
|
Assert.assertEquals(
|
||||||
|
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsDynamicWithNullMaxTotalRows()
|
||||||
|
{
|
||||||
|
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, null);
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||||
|
Assert.assertEquals(
|
||||||
|
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxTotalRows()
|
||||||
|
{
|
||||||
|
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(null, 1000L);
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||||
|
Assert.assertEquals(
|
||||||
|
partitionsSpec,
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsDynamicWithMaxRowsPerSegment()
|
||||||
|
{
|
||||||
|
final PartitionsSpec partitionsSpec = new DynamicPartitionsSpec(100, 1000L);
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||||
|
Assert.assertEquals(
|
||||||
|
partitionsSpec,
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
|
||||||
|
{
|
||||||
|
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
||||||
|
"datasource",
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
100,
|
||||||
|
null,
|
||||||
|
new UserCompactionTaskQueryTuningConfig(
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
1000L,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
),
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Assert.assertEquals(
|
||||||
|
new DynamicPartitionsSpec(100, 1000L),
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(
|
||||||
|
ClientCompactionTaskQueryTuningConfig.from(config)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsHashed()
|
||||||
|
{
|
||||||
|
final PartitionsSpec partitionsSpec =
|
||||||
|
new HashedPartitionsSpec(null, 100, Collections.singletonList("dim"));
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||||
|
Assert.assertEquals(
|
||||||
|
partitionsSpec,
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindPartitionsSpecWhenGivenIsRange()
|
||||||
|
{
|
||||||
|
final PartitionsSpec partitionsSpec =
|
||||||
|
new DimensionRangePartitionsSpec(null, 10000, Collections.singletonList("dim"), false);
|
||||||
|
final ClientCompactionTaskQueryTuningConfig tuningConfig
|
||||||
|
= ClientCompactionTaskQueryTuningConfig.from(createCompactionConfig(partitionsSpec));
|
||||||
|
Assert.assertEquals(
|
||||||
|
partitionsSpec,
|
||||||
|
CompactionStatus.findPartitionsSpecFromConfig(tuningConfig)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static DataSourceCompactionConfig createCompactionConfig(
|
||||||
|
PartitionsSpec partitionsSpec
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new DataSourceCompactionConfig(
|
||||||
|
DS_WIKI,
|
||||||
|
null, null, null, null, createTuningConfig(partitionsSpec),
|
||||||
|
null, null, null, null, null, null, null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static UserCompactionTaskQueryTuningConfig createTuningConfig(
|
||||||
|
PartitionsSpec partitionsSpec
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return new UserCompactionTaskQueryTuningConfig(
|
||||||
|
null,
|
||||||
|
null, null, null, null, partitionsSpec, null, null, null,
|
||||||
|
null, null, null, null, null, null, null, null, null, null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.server.coordinator.compact;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
|
import org.apache.druid.java.util.common.Intervals;
|
||||||
|
import org.joda.time.Interval;
|
||||||
|
import org.joda.time.Period;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class DataSourceCompactibleSegmentIteratorTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testFilterSkipIntervals()
|
||||||
|
{
|
||||||
|
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
|
||||||
|
final List<Interval> expectedSkipIntervals = ImmutableList.of(
|
||||||
|
Intervals.of("2018-01-15/2018-03-02"),
|
||||||
|
Intervals.of("2018-07-23/2018-10-01"),
|
||||||
|
Intervals.of("2018-10-02/2018-12-25"),
|
||||||
|
Intervals.of("2018-12-31/2019-01-01")
|
||||||
|
);
|
||||||
|
final List<Interval> skipIntervals = DataSourceCompactibleSegmentIterator.filterSkipIntervals(
|
||||||
|
totalInterval,
|
||||||
|
Lists.newArrayList(
|
||||||
|
Intervals.of("2017-12-01/2018-01-15"),
|
||||||
|
Intervals.of("2018-03-02/2018-07-23"),
|
||||||
|
Intervals.of("2018-10-01/2018-10-02"),
|
||||||
|
Intervals.of("2018-12-25/2018-12-31")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddSkipIntervalFromLatestAndSort()
|
||||||
|
{
|
||||||
|
final List<Interval> expectedIntervals = ImmutableList.of(
|
||||||
|
Intervals.of("2018-12-24/2018-12-25"),
|
||||||
|
Intervals.of("2018-12-29/2019-01-01")
|
||||||
|
);
|
||||||
|
final List<Interval> fullSkipIntervals = DataSourceCompactibleSegmentIterator.sortAndAddSkipIntervalFromLatest(
|
||||||
|
DateTimes.of("2019-01-01"),
|
||||||
|
new Period(72, 0, 0, 0),
|
||||||
|
null,
|
||||||
|
ImmutableList.of(
|
||||||
|
Intervals.of("2018-12-30/2018-12-31"),
|
||||||
|
Intervals.of("2018-12-24/2018-12-25")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,477 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.server.coordinator.compact;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
|
|
||||||
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
|
|
||||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
|
||||||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
|
||||||
import org.apache.druid.java.util.common.Intervals;
|
|
||||||
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
|
|
||||||
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
|
|
||||||
import org.joda.time.Interval;
|
|
||||||
import org.joda.time.Period;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public class NewestSegmentFirstIteratorTest
|
|
||||||
{
|
|
||||||
@Test
|
|
||||||
public void testFilterSkipIntervals()
|
|
||||||
{
|
|
||||||
final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
|
|
||||||
final List<Interval> expectedSkipIntervals = ImmutableList.of(
|
|
||||||
Intervals.of("2018-01-15/2018-03-02"),
|
|
||||||
Intervals.of("2018-07-23/2018-10-01"),
|
|
||||||
Intervals.of("2018-10-02/2018-12-25"),
|
|
||||||
Intervals.of("2018-12-31/2019-01-01")
|
|
||||||
);
|
|
||||||
final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
|
|
||||||
totalInterval,
|
|
||||||
Lists.newArrayList(
|
|
||||||
Intervals.of("2017-12-01/2018-01-15"),
|
|
||||||
Intervals.of("2018-03-02/2018-07-23"),
|
|
||||||
Intervals.of("2018-10-01/2018-10-02"),
|
|
||||||
Intervals.of("2018-12-25/2018-12-31")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
Assert.assertEquals(expectedSkipIntervals, skipIntervals);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAddSkipIntervalFromLatestAndSort()
|
|
||||||
{
|
|
||||||
final List<Interval> expectedIntervals = ImmutableList.of(
|
|
||||||
Intervals.of("2018-12-24/2018-12-25"),
|
|
||||||
Intervals.of("2018-12-29/2019-01-01")
|
|
||||||
);
|
|
||||||
final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
|
|
||||||
DateTimes.of("2019-01-01"),
|
|
||||||
new Period(72, 0, 0, 0),
|
|
||||||
null,
|
|
||||||
ImmutableList.of(
|
|
||||||
Intervals.of("2018-12-30/2018-12-31"),
|
|
||||||
Intervals.of("2018-12-24/2018-12-25")
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
Assert.assertEquals(expectedIntervals, fullSkipIntervals);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithNullTuningConfigReturnDynamicPartitinosSpecWithMaxTotalRowsOfLongMax()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithNullMaxTotalRowsReturnLongMaxValue()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new DynamicPartitionsSpec(null, null),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithNonNullMaxTotalRowsReturnGivenValue()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new DynamicPartitionsSpec(null, 1000L),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(null, 1000L),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithNonNullMaxRowsPerSegmentReturnGivenValue()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new DynamicPartitionsSpec(100, 1000L),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(100, 1000L),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndMaxTotalRowsReturnGivenValues()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
100,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
1000L,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(100, 1000L),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxRowsPerSegmentAndPartitionsSpecIgnoreDeprecatedOne()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
100,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new DynamicPartitionsSpec(null, null),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithDeprecatedMaxTotalRowsAndPartitionsSpecIgnoreDeprecatedOne()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
1000L,
|
|
||||||
null,
|
|
||||||
new DynamicPartitionsSpec(null, null),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithHashPartitionsSpec()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFindPartitionsSpecFromConfigWithRangePartitionsSpec()
|
|
||||||
{
|
|
||||||
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
|
|
||||||
"datasource",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new UserCompactionTaskQueryTuningConfig(
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
),
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
Assert.assertEquals(
|
|
||||||
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
|
|
||||||
CompactionStatus.findPartitionsSpecFromConfig(
|
|
||||||
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -88,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testLargeOffsetAndSmallSegmentInterval()
|
public void testLargeOffsetAndSmallSegmentInterval()
|
||||||
{
|
{
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P2D"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -113,7 +113,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testSmallOffsetAndLargeSegmentInterval()
|
public void testSmallOffsetAndLargeSegmentInterval()
|
||||||
{
|
{
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1M"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -146,7 +146,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testLargeGapInData()
|
public void testLargeGapInData()
|
||||||
{
|
{
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H1M"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -179,7 +179,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testHugeShard()
|
public void testHugeShard()
|
||||||
{
|
{
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -229,7 +229,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testManySegmentsPerShard()
|
public void testManySegmentsPerShard()
|
||||||
{
|
{
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new Period("P1D"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -287,7 +287,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
{
|
{
|
||||||
final String unknownDataSource = "unknown";
|
final String unknownDataSource = "unknown";
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
unknownDataSource,
|
unknownDataSource,
|
||||||
createCompactionConfig(10000, new Period("P2D"), null),
|
createCompactionConfig(10000, new Period("P2D"), null),
|
||||||
|
@ -337,7 +337,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
2
|
2
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -374,7 +374,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), null)),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -412,7 +412,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
|
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("P1D"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -445,7 +445,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -471,7 +471,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
new SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"), new Period("PT5H"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -496,7 +496,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testWithSkipIntervals()
|
public void testWithSkipIntervals()
|
||||||
{
|
{
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("P1D"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -536,7 +536,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testHoleInSearchInterval()
|
public void testHoleInSearchInterval()
|
||||||
{
|
{
|
||||||
final Period segmentPeriod = new Period("PT1H");
|
final Period segmentPeriod = new Period("PT1H");
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new Period("PT1H"), null)),
|
||||||
ImmutableMap.of(
|
ImmutableMap.of(
|
||||||
DATA_SOURCE,
|
DATA_SOURCE,
|
||||||
|
@ -586,7 +586,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
|
new SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"), new Period("P1D"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -635,7 +635,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
|
new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new Period("P7D"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -670,7 +670,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
|
new SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), new Period("P1D"))
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -696,7 +696,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -740,7 +740,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets segmentGranularity=DAY
|
// Auto compaction config sets segmentGranularity=DAY
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -773,7 +773,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets segmentGranularity=DAY
|
// Auto compaction config sets segmentGranularity=DAY
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -787,7 +787,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -806,7 +806,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets segmentGranularity=YEAR
|
// Auto compaction config sets segmentGranularity=YEAR
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -830,7 +830,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -849,7 +849,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets segmentGranularity=YEAR
|
// Auto compaction config sets segmentGranularity=YEAR
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -873,7 +873,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -887,7 +887,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
|
|
||||||
// Duration of new segmentGranularity is the same as before (P1D),
|
// Duration of new segmentGranularity is the same as before (P1D),
|
||||||
// but we changed the timezone from UTC to Bangkok in the auto compaction spec
|
// but we changed the timezone from UTC to Bangkok in the auto compaction spec
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(
|
createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
|
@ -925,7 +925,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -938,7 +938,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec
|
// Duration of new segmentGranularity is the same as before (P1D), but we changed the origin in the autocompaction spec
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(
|
createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
|
@ -976,7 +976,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have
|
// Create segments that were compacted (CompactionState != null) and have
|
||||||
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||||
|
@ -1004,7 +1004,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets rollup=true
|
// Auto compaction config sets rollup=true
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -1036,7 +1036,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have
|
// Create segments that were compacted (CompactionState != null) and have
|
||||||
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||||
|
@ -1064,7 +1064,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets queryGranularity=MINUTE
|
// Auto compaction config sets queryGranularity=MINUTE
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(null, Granularities.MINUTE, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -1096,7 +1096,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have
|
// Create segments that were compacted (CompactionState != null) and have
|
||||||
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||||
|
@ -1131,7 +1131,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets Dimensions=["foo"]
|
// Auto compaction config sets Dimensions=["foo"]
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1172,7 +1172,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
|
||||||
// Auto compaction config sets Dimensions=null
|
// Auto compaction config sets Dimensions=null
|
||||||
iterator = policy.reset(
|
iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1195,7 +1195,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have
|
// Create segments that were compacted (CompactionState != null) and have
|
||||||
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||||
|
@ -1251,7 +1251,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null)
|
// Auto compaction config sets filter=SelectorDimFilter("dim1", "bar", null)
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1292,7 +1292,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
|
||||||
// Auto compaction config sets filter=null
|
// Auto compaction config sets filter=null
|
||||||
iterator = policy.reset(
|
iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1319,7 +1319,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Same indexSpec as what is set in the auto compaction config
|
// Same indexSpec as what is set in the auto compaction config
|
||||||
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
|
||||||
// Same partitionsSpec as what is set in the auto compaction config
|
// Same partitionsSpec as what is set in the auto compaction config
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have
|
// Create segments that were compacted (CompactionState != null) and have
|
||||||
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
|
||||||
|
@ -1375,7 +1375,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")}
|
// Auto compaction config sets metricsSpec={CountAggregatorFactory("cnt"), LongSumAggregatorFactory("val", "val")}
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1416,7 +1416,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
|
||||||
// Auto compaction config sets metricsSpec=null
|
// Auto compaction config sets metricsSpec=null
|
||||||
iterator = policy.reset(
|
iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1440,7 +1440,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
new SegmentGenerateSpec(Intervals.of("2017-10-01T01:00:00/2017-10-01T02:00:00"), new Period("PT1H"), "1994-04-30T00:00:00.000Z", null)
|
||||||
);
|
);
|
||||||
|
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null))),
|
||||||
ImmutableMap.of(DATA_SOURCE, timeline),
|
ImmutableMap.of(DATA_SOURCE, timeline),
|
||||||
Collections.emptyMap()
|
Collections.emptyMap()
|
||||||
|
@ -1468,7 +1468,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
// Different indexSpec as what is set in the auto compaction config
|
// Different indexSpec as what is set in the auto compaction config
|
||||||
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
|
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
|
||||||
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
|
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
|
|
||||||
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
|
@ -1481,7 +1481,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
|
|
||||||
// Duration of new segmentGranularity is the same as before (P1D)
|
// Duration of new segmentGranularity is the same as before (P1D)
|
||||||
final CompactionSegmentIterator iterator = policy.reset(
|
final CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(
|
createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
|
@ -1517,7 +1517,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
|
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
|
||||||
{
|
{
|
||||||
NullHandling.initializeForTests();
|
NullHandling.initializeForTests();
|
||||||
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
|
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null));
|
||||||
final SegmentTimeline timeline = createTimeline(
|
final SegmentTimeline timeline = createTimeline(
|
||||||
new SegmentGenerateSpec(
|
new SegmentGenerateSpec(
|
||||||
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
|
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
|
||||||
|
@ -1534,7 +1534,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1569,7 +1569,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
);
|
);
|
||||||
Assert.assertFalse(iterator.hasNext());
|
Assert.assertFalse(iterator.hasNext());
|
||||||
|
|
||||||
iterator = policy.reset(
|
iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
|
||||||
130000,
|
130000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1608,7 +1608,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSkipAllGranularityToDefault()
|
public void testSkipAllGranularityToDefault()
|
||||||
{
|
{
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1640,7 +1640,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSkipFirstHalfEternityToDefault()
|
public void testSkipFirstHalfEternityToDefault()
|
||||||
{
|
{
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1672,7 +1672,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSkipSecondHalfOfEternityToDefault()
|
public void testSkipSecondHalfOfEternityToDefault()
|
||||||
{
|
{
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1704,7 +1704,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSkipAllToAllGranularity()
|
public void testSkipAllToAllGranularity()
|
||||||
{
|
{
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1736,7 +1736,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSkipAllToFinerGranularity()
|
public void testSkipAllToFinerGranularity()
|
||||||
{
|
{
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1799,7 +1799,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
0,
|
0,
|
||||||
1);
|
1);
|
||||||
|
|
||||||
CompactionSegmentIterator iterator = policy.reset(
|
CompactionSegmentIterator iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
@ -1850,7 +1850,7 @@ public class NewestSegmentFirstPolicyTest
|
||||||
TombstoneShardSpec.INSTANCE,
|
TombstoneShardSpec.INSTANCE,
|
||||||
0,
|
0,
|
||||||
1);
|
1);
|
||||||
iterator = policy.reset(
|
iterator = policy.createIterator(
|
||||||
ImmutableMap.of(DATA_SOURCE,
|
ImmutableMap.of(DATA_SOURCE,
|
||||||
createCompactionConfig(10000,
|
createCompactionConfig(10000,
|
||||||
new Period("P0D"),
|
new Period("P0D"),
|
||||||
|
|
Loading…
Reference in New Issue