Add support keepSegmentGranularity for automatic compaction (#6407)

* Add support keepSegmentGranularity for automatic compaction

* skip unknown dataSource

* ignore single semgnet to compact

* add doc

* address comments

* address comment
This commit is contained in:
Jihoon Son 2018-10-07 16:48:58 -07:00 committed by Fangjin Yang
parent 45aa51a00c
commit 88d23b77b7
15 changed files with 531 additions and 344 deletions

View File

@ -89,8 +89,10 @@ public class NewestSegmentFirstPolicyBenchmark
dataSource,
new DataSourceCompactionConfig(
dataSource,
false,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
null,
null,
null,

View File

@ -782,9 +782,11 @@ A description of the compaction config is:
|Property|Description|Required|
|--------|-----------|--------|
|`dataSource`|dataSource name to be compacted.|yes|
|`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)|
|`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)|
|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 838860800)|
|`numTargetCompactionSegments`|Max number of segments to compact together.|no (default = 150)|
|`inputSegmentSizeBytes`|Total input segment size of a compactionTask.|no (default = 419430400)|
|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)|
|`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)|
|`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
|`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no|

View File

@ -13,6 +13,8 @@ Compaction tasks merge all segments of the given interval. The syntax is:
"dataSource": <task_datasource>,
"interval": <interval to specify segments to be merged>,
"dimensions" <custom dimensionsSpec>,
"keepSegmentGranularity": <true or false>,
"targetCompactionSizeBytes": <target size of compacted segments>
"tuningConfig" <index task tuningConfig>,
"context": <task context>
}
@ -22,9 +24,11 @@ Compaction tasks merge all segments of the given interval. The syntax is:
|-----|-----------|--------|
|`type`|Task type. Should be `compact`|Yes|
|`id`|Task id|No|
|`dataSource`|dataSource name to be compacted|Yes|
|`interval`|interval of segments to be compacted|Yes|
|`dimensions`|custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`dataSource`|DataSource name to be compacted|Yes|
|`interval`|Interval of segments to be compacted|Yes|
|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
|`keepSegmentGranularity`|If set to true, compactionTask will keep the time chunk boundaries and merge segments only if they fall into the same time chunk.|No (default = true)|
|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
|`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No|
|`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No|
@ -62,4 +66,3 @@ your own ordering and types, you can specify a custom `dimensionsSpec` in the co
- Roll-up: the output segment is rolled up only when `rollup` is set for all input segments.
See [Roll-up](../ingestion/index.html#rollup) for more details.
You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes).
- Partitioning: The compaction task is a special form of native batch indexing task, so it always uses hash-based partitioning on the full set of dimensions.

View File

@ -161,7 +161,7 @@ public class CompactionTask extends AbstractTask
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
this.partitionConfigurationManager = new PartitionConfigurationManager(this.targetCompactionSizeBytes, tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;

View File

@ -38,6 +38,11 @@ public final class Intervals
return new Interval(interval, ISOChronology.getInstanceUTC());
}
public static Interval of(String format, Object... formatArgs)
{
return of(StringUtils.format(format, formatArgs));
}
public static boolean isEmpty(Interval interval)
{
return interval.getStart().equals(interval.getEnd());

View File

@ -31,6 +31,7 @@ public class ClientCompactQuery
private final String dataSource;
private final List<DataSegment> segments;
private final boolean keepSegmentGranularity;
private final Long targetCompactionSizeBytes;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> context;
@ -39,6 +40,7 @@ public class ClientCompactQuery
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
@ -46,6 +48,7 @@ public class ClientCompactQuery
this.dataSource = dataSource;
this.segments = segments;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
this.context = context;
}
@ -74,6 +77,12 @@ public class ClientCompactQuery
return keepSegmentGranularity;
}
@JsonProperty
public Long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@JsonProperty
public ClientCompactQueryTuningConfig getTuningConfig()
{
@ -90,10 +99,12 @@ public class ClientCompactQuery
public String toString()
{
return "ClientCompactQuery{" +
"dataSource=" + dataSource + "'" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
", contexts=" + context +
"}";
", context=" + context +
'}';
}
}

View File

@ -91,6 +91,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
@ -107,7 +108,16 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
return runTask(
new ClientCompactQuery(
dataSource,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context
)
);
}
@Override

View File

@ -39,6 +39,7 @@ public interface IndexingServiceClient
String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -35,15 +35,19 @@ public class DataSourceCompactionConfig
// should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024;
private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150;
private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D");
private final String dataSource;
private final boolean keepSegmentGranularity;
private final int taskPriority;
private final long inputSegmentSizeBytes;
private final long targetCompactionSizeBytes;
// The number of compaction segments is limited because the byte size of a serialized task spec is limited by
// The number of input segments is limited because the byte size of a serialized task spec is limited by
// RemoteTaskRunnerConfig.maxZnodeBytes.
private final int numTargetCompactionSegments;
private final int maxNumSegmentsToCompact;
private final Period skipOffsetFromLatest;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> taskContext;
@ -51,30 +55,38 @@ public class DataSourceCompactionConfig
@JsonCreator
public DataSourceCompactionConfig(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("keepSegmentGranularity") Boolean keepSegmentGranularity,
@JsonProperty("taskPriority") @Nullable Integer taskPriority,
@JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("numTargetCompactionSegments") @Nullable Integer numTargetCompactionSegments,
@JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact,
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.taskPriority = taskPriority == null ?
DEFAULT_COMPACTION_TASK_PRIORITY :
taskPriority;
this.targetCompactionSizeBytes = targetCompactionSizeBytes == null ?
DEFAULT_TARGET_COMPACTION_SIZE_BYTES :
targetCompactionSizeBytes;
this.numTargetCompactionSegments = numTargetCompactionSegments == null ?
DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS :
numTargetCompactionSegments;
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
: keepSegmentGranularity;
this.taskPriority = taskPriority == null
? DEFAULT_COMPACTION_TASK_PRIORITY
: taskPriority;
this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
: inputSegmentSizeBytes;
this.targetCompactionSizeBytes = targetCompactionSizeBytes == null
? DEFAULT_TARGET_COMPACTION_SIZE_BYTES
: targetCompactionSizeBytes;
this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
? DEFAULT_NUM_INPUT_SEGMENTS
: maxNumSegmentsToCompact;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
this.taskContext = taskContext;
Preconditions.checkArgument(
this.numTargetCompactionSegments > 1,
this.maxNumSegmentsToCompact > 1,
"numTargetCompactionSegments should be larger than 1"
);
}
@ -85,6 +97,12 @@ public class DataSourceCompactionConfig
return dataSource;
}
@JsonProperty
public boolean isKeepSegmentGranularity()
{
return keepSegmentGranularity;
}
@JsonProperty
public int getTaskPriority()
{
@ -92,15 +110,21 @@ public class DataSourceCompactionConfig
}
@JsonProperty
public long getTargetCompactionSizeBytes()
public long getInputSegmentSizeBytes()
{
return targetCompactionSizeBytes;
return inputSegmentSizeBytes;
}
@JsonProperty
public int getNumTargetCompactionSegments()
public int getMaxNumSegmentsToCompact()
{
return numTargetCompactionSegments;
return maxNumSegmentsToCompact;
}
@JsonProperty
public long getTargetCompactionSizeBytes()
{
return targetCompactionSizeBytes;
}
@JsonProperty
@ -140,15 +164,23 @@ public class DataSourceCompactionConfig
return false;
}
if (keepSegmentGranularity != that.keepSegmentGranularity) {
return false;
}
if (taskPriority != that.taskPriority) {
return false;
}
if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) {
if (inputSegmentSizeBytes != that.inputSegmentSizeBytes) {
return false;
}
if (numTargetCompactionSegments != that.numTargetCompactionSegments) {
if (maxNumSegmentsToCompact != that.maxNumSegmentsToCompact) {
return false;
}
if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) {
return false;
}
@ -168,9 +200,11 @@ public class DataSourceCompactionConfig
{
return Objects.hash(
dataSource,
keepSegmentGranularity,
taskPriority,
inputSegmentSizeBytes,
maxNumSegmentsToCompact,
targetCompactionSizeBytes,
numTargetCompactionSegments,
skipOffsetFromLatest,
tuningConfig,
taskContext

View File

@ -154,7 +154,8 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
// find segments to be compacted.
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
false,
config.isKeepSegmentGranularity(),
config.getTargetCompactionSizeBytes(),
config.getTaskPriority(),
config.getTuningConfig(),
config.getTaskContext()

View File

@ -19,14 +19,10 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -34,14 +30,13 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -63,15 +58,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
// dataSource -> intervalToFind
// searchIntervals keeps track of the current state of which interval should be considered to search segments to
// compact.
private final Map<String, Interval> searchIntervals;
private final Map<String, CompactibleTimelineObjectHolderCursor> timelineIterators;
// dataSource -> end dateTime of the initial searchInterval
// searchEndDates keeps the endDate of the initial searchInterval (the entire searchInterval). It's immutable and not
// changed once it's initialized.
// This is used to determine that we can expect more segments to be added for an interval in the future. If the end of
// the interval is same with searchEndDate, we can expect more segments to be added and discard the found segments for
// compaction in this run to further optimize the size of compact segments. See checkCompactableSizeForLastSegmentOrReturn().
private final Map<String, DateTime> searchEndDates;
private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, o1.interval)
);
@ -83,8 +71,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
this.compactionConfigs = compactionConfigs;
this.dataSources = dataSources;
this.searchIntervals = new HashMap<>(dataSources.size());
this.searchEndDates = new HashMap<>(dataSources.size());
this.timelineIterators = new HashMap<>(dataSources.size());
for (Entry<String, VersionedIntervalTimeline<String, DataSegment>> entry : dataSources.entrySet()) {
final String dataSource = entry.getKey();
@ -93,8 +80,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (config != null && !timeline.isEmpty()) {
final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
searchIntervals.put(dataSource, searchInterval);
searchEndDates.put(dataSource, searchInterval.getEnd());
timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
}
}
@ -115,9 +101,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>();
resultMap.defaultReturnValue(UNKNOWN_REMAINING_SEGMENT_SIZE);
final Iterator<QueueEntry> iterator = queue.iterator();
while (iterator.hasNext()) {
final QueueEntry entry = iterator.next();
for (QueueEntry entry : queue) {
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
@ -165,41 +149,82 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
/**
* Find the next segments to compact for the given dataSource and add them to the queue.
* {@link #searchIntervals} is updated according to the found segments. That is, the interval of the found segments
* are removed from the searchInterval of the given dataSource.
* {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from
* the timeline of the given dataSource.
*/
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
{
VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(dataSourceName);
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = timelineIterators.get(
dataSourceName
);
if (timeline == null || timeline.isEmpty()) {
log.warn("Cannot find timeline for dataSource[%s]. Continue to the next dataSource", dataSourceName);
if (compactibleTimelineObjectHolderCursor == null) {
log.warn("Cannot find timeline for dataSource[%s]. Skip this dataSource", dataSourceName);
return;
}
final Interval searchInterval = Preconditions.checkNotNull(
searchIntervals.get(dataSourceName),
"Cannot find intervals to find for dataSource[%s]",
dataSourceName
);
final DateTime searchEnd = Preconditions.checkNotNull(
searchEndDates.get(dataSourceName),
"searchEndDate for dataSource[%s]",
dataSourceName
);
final Pair<Interval, SegmentsToCompact> pair = findSegmentsToCompact(
timeline,
searchInterval,
searchEnd,
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
compactibleTimelineObjectHolderCursor,
config
);
final List<DataSegment> segmentsToCompact = pair.rhs.getSegments();
final Interval remainingSearchInterval = pair.lhs;
searchIntervals.put(dataSourceName, remainingSearchInterval);
if (!segmentsToCompact.isEmpty()) {
queue.add(new QueueEntry(segmentsToCompact));
if (segmentsToCompact.getSize() > 1) {
queue.add(new QueueEntry(segmentsToCompact.segments));
}
}
/**
* Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned,
* which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0.
*/
private static class CompactibleTimelineObjectHolderCursor
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
CompactibleTimelineObjectHolderCursor(
VersionedIntervalTimeline<String, DataSegment> timeline,
Interval totalIntervalToSearch
)
{
this.holders = timeline
.lookup(totalIntervalToSearch)
.stream()
.filter(holder -> {
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
return chunks.size() > 0
&& partitionBytes > 0
&& totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval());
})
.collect(Collectors.toList());
}
boolean hasNext()
{
return !holders.isEmpty();
}
/**
* Returns the latest holder.
*/
@Nullable
TimelineObjectHolder<String, DataSegment> get()
{
if (holders.isEmpty()) {
return null;
} else {
return holders.get(holders.size() - 1);
}
}
/**
* Removes the latest holder, so that {@link #get()} returns the next one.
*/
void next()
{
if (!holders.isEmpty()) {
holders.remove(holders.size() - 1);
}
}
}
@ -209,179 +234,97 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
* looked up for the last one day of the given intervalToSearch, and the next day is searched again if the size of
* found segments are not enough to compact. This is repeated until enough amount of segments are found.
*
* @param timeline timeline of a dataSource
* @param intervalToSearch interval to search
* @param searchEnd the end of the whole searchInterval
* @param compactibleTimelineObjectHolderCursor timeline iterator
* @param config compaction config
*
* @return a pair of the reduced interval of (intervalToSearch - interval of found segments) and segments to compact
* @return segments to compact
*/
@VisibleForTesting
static Pair<Interval, SegmentsToCompact> findSegmentsToCompact(
final VersionedIntervalTimeline<String, DataSegment> timeline,
final Interval intervalToSearch,
final DateTime searchEnd,
private static SegmentsToCompact findSegmentsToCompact(
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
{
final long targetCompactionSize = config.getTargetCompactionSizeBytes();
final int numTargetSegments = config.getNumTargetCompactionSegments();
final boolean keepSegmentGranularity = config.isKeepSegmentGranularity();
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
final List<DataSegment> segmentsToCompact = new ArrayList<>();
Interval searchInterval = intervalToSearch;
long totalSegmentsToCompactBytes = 0;
// Finds segments to compact together while iterating searchInterval from latest to oldest
while (!Intervals.isEmpty(searchInterval)
&& totalSegmentsToCompactBytes < targetCompactionSize
&& segmentsToCompact.size() < numTargetSegments) {
final Interval lookupInterval = SegmentCompactorUtil.getNextLoopupInterval(searchInterval);
// holders are sorted by their interval
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(lookupInterval);
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()
&& totalSegmentsToCompactBytes < inputSegmentSize
&& segmentsToCompact.size() < maxNumSegmentsToCompact) {
final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
compactibleTimelineObjectHolderCursor.get(),
"timelineObjectHolder"
);
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator());
final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
if (holders.isEmpty()) {
// We found nothing. Continue to the next interval.
searchInterval = SegmentCompactorUtil.removeIntervalFromEnd(searchInterval, lookupInterval);
continue;
}
for (int i = holders.size() - 1; i >= 0; i--) {
final TimelineObjectHolder<String, DataSegment> holder = holders.get(i);
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
if (chunks.size() == 0 || partitionBytes == 0) {
log.warn("Skip empty shard[%s]", holder);
continue;
}
if (!intervalToSearch.contains(chunks.get(0).getObject().getInterval())) {
searchInterval = SegmentCompactorUtil.removeIntervalFromEnd(
searchInterval,
new Interval(chunks.get(0).getObject().getInterval().getStart(), searchInterval.getEnd())
);
continue;
}
// Addition of the segments of a partition should be atomic.
if (SegmentCompactorUtil.isCompactible(targetCompactionSize, totalSegmentsToCompactBytes, partitionBytes) &&
segmentsToCompact.size() + chunks.size() <= numTargetSegments) {
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
totalSegmentsToCompactBytes += partitionBytes;
// The segments in a holder should be added all together or not.
if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, totalSegmentsToCompactBytes, timeChunkSizeBytes)
&& SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact, segmentsToCompact.size(), chunks.size())
&& (!keepSegmentGranularity || segmentsToCompact.size() == 0)) {
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
totalSegmentsToCompactBytes += timeChunkSizeBytes;
} else {
if (segmentsToCompact.size() > 1) {
// We found some segmens to compact and cannot add more. End here.
return new SegmentsToCompact(segmentsToCompact);
} else {
if (segmentsToCompact.size() > 1) {
// We found some segmens to compact and cannot add more. End here.
return checkCompactableSizeForLastSegmentOrReturn(
segmentsToCompact,
totalSegmentsToCompactBytes,
timeline,
searchInterval,
searchEnd,
config
// (*) Discard segments found so far because we can't compact them anyway.
final int numSegmentsToCompact = segmentsToCompact.size();
segmentsToCompact.clear();
if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
final DataSegment segment = chunks.get(0).getObject();
log.warn(
"shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next shard.",
timeChunkSizeBytes,
segment.getDataSource(),
segment.getInterval(),
inputSegmentSize
);
} else if (maxNumSegmentsToCompact < chunks.size()) {
final DataSegment segment = chunks.get(0).getObject();
log.warn(
"The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
+ "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
+ "segments, consider increasing 'numTargetCompactionSegments' and "
+ "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.",
chunks.size(),
segment.getDataSource(),
segment.getInterval(),
maxNumSegmentsToCompact
);
} else {
// (*) Discard segments found so far because we can't compact it anyway.
final int numSegmentsToCompact = segmentsToCompact.size();
segmentsToCompact.clear();
if (!SegmentCompactorUtil.isCompactible(targetCompactionSize, 0, partitionBytes)) {
// TODO: this should be changed to compact many small segments into a few large segments
final DataSegment segment = chunks.get(0).getObject();
log.warn(
"shardSize[%d] for dataSource[%s] and interval[%s] is larger than targetCompactionSize[%d]."
+ " Contitnue to the next shard.",
partitionBytes,
segment.getDataSource(),
segment.getInterval(),
targetCompactionSize
);
} else if (numTargetSegments < chunks.size()) {
final DataSegment segment = chunks.get(0).getObject();
log.warn(
"The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
+ "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
+ "segments, consider increasing 'numTargetCompactionSegments' and "
+ "'druid.indexer.runner.maxZnodeBytes'. Contitnue to the next shard.",
chunks.size(),
segment.getDataSource(),
segment.getInterval(),
numTargetSegments
);
if (numSegmentsToCompact == 1) {
// We found a segment which is smaller than targetCompactionSize but too large to compact with other
// segments. Skip this one.
// Note that segmentsToCompact is already cleared at (*).
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
totalSegmentsToCompactBytes = timeChunkSizeBytes;
} else {
if (numSegmentsToCompact == 1) {
// We found a segment which is smaller than targetCompactionSize but too large to compact with other
// segments. Skip this one.
// Note that segmentsToCompact is already cleared at (*).
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
totalSegmentsToCompactBytes = partitionBytes;
} else {
throw new ISE(
"Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]",
chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
partitionBytes,
chunks.size()
);
}
throw new ISE(
"Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]",
chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
timeChunkSizeBytes,
chunks.size()
);
}
}
}
// Update searchInterval
searchInterval = SegmentCompactorUtil.removeIntervalFromEnd(
searchInterval,
new Interval(chunks.get(0).getObject().getInterval().getStart(), searchInterval.getEnd())
);
}
compactibleTimelineObjectHolderCursor.next();
}
if (segmentsToCompact.size() == 0 || segmentsToCompact.size() == 1) {
if (Intervals.isEmpty(searchInterval)) {
// We found nothing to compact. End here.
return Pair.of(intervalToSearch, new SegmentsToCompact(ImmutableList.of()));
} else {
// We found only 1 segment. Further find segments for the remaining interval.
return findSegmentsToCompact(timeline, searchInterval, searchEnd, config);
}
if (segmentsToCompact.size() > 1) {
return new SegmentsToCompact(segmentsToCompact);
} else {
return new SegmentsToCompact(Collections.emptyList());
}
return checkCompactableSizeForLastSegmentOrReturn(
segmentsToCompact,
totalSegmentsToCompactBytes,
timeline,
searchInterval,
searchEnd,
config
);
}
/**
* Check the found segments are enough to compact. If it's expected that more data will be added in the future for the
* interval of found segments, the found segments are skipped and remained to be considered again in the next
* coordinator run. Otherwise, simply returns a pair of the given searchInterval and found segments.
*/
private static Pair<Interval, SegmentsToCompact> checkCompactableSizeForLastSegmentOrReturn(
final List<DataSegment> segmentsToCompact,
final long totalSegmentsToCompactBytes,
final VersionedIntervalTimeline<String, DataSegment> timeline,
final Interval searchInterval,
final DateTime searchEnd,
final DataSourceCompactionConfig config
)
{
if (segmentsToCompact.size() > 0) {
// Check we have enough segments to compact. For realtime dataSources, we can expect more data to be added in the
// future, so we skip compaction for segments in this run if their size is not sufficiently large.
final DataSegment lastSegment = segmentsToCompact.get(segmentsToCompact.size() - 1);
if (lastSegment.getInterval().getEnd().equals(searchEnd) &&
!SegmentCompactorUtil.isProperCompactionSize(
config.getTargetCompactionSizeBytes(),
totalSegmentsToCompactBytes
) &&
config.getNumTargetCompactionSegments() > segmentsToCompact.size()) {
// Ignore found segments and find again for the remaininig searchInterval.
return findSegmentsToCompact(timeline, searchInterval, searchEnd, config);
}
}
return Pair.of(searchInterval, new SegmentsToCompact(segmentsToCompact));
}
/**
@ -432,7 +375,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private final Interval interval; // whole interval for all segments
private final List<DataSegment> segments;
QueueEntry(List<DataSegment> segments)
private QueueEntry(List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && !segments.isEmpty());
Collections.sort(segments);
@ -443,7 +386,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
this.segments = segments;
}
String getDataSource()
private String getDataSource()
{
return segments.get(0).getDataSource();
}
@ -453,14 +396,14 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
private final List<DataSegment> segments;
SegmentsToCompact(List<DataSegment> segments)
private SegmentsToCompact(List<DataSegment> segments)
{
this.segments = segments;
}
public List<DataSegment> getSegments()
private int getSize()
{
return segments;
return segments.size();
}
}
}

View File

@ -20,42 +20,21 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
/**
* Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}.
*/
class SegmentCompactorUtil
{
private static final Period LOOKUP_PERIOD = new Period("P1D");
private static final Duration LOOKUP_DURATION = LOOKUP_PERIOD.toStandardDuration();
// Allow compaction of segments if totalSize(segments) <= remainingBytes * ALLOWED_MARGIN_OF_COMPACTION_SIZE
private static final double ALLOWED_MARGIN_OF_COMPACTION_SIZE = .1;
static boolean isCompactible(long remainingBytes, long currentTotalBytes, long additionalBytes)
static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes)
{
return remainingBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= currentTotalBytes + additionalBytes;
return currentTotalBytes + additionalBytes <= targetBytes;
}
static boolean isProperCompactionSize(long targetCompactionSizeBytes, long totalBytesOfSegmentsToCompact)
static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments)
{
return targetCompactionSizeBytes * (1 - ALLOWED_MARGIN_OF_COMPACTION_SIZE) <= totalBytesOfSegmentsToCompact &&
targetCompactionSizeBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= totalBytesOfSegmentsToCompact;
}
/**
* Return an interval for looking up for timeline.
* If {@code totalInterval} is larger than {@link #LOOKUP_PERIOD}, it returns an interval of {@link #LOOKUP_PERIOD}
* and the end of {@code totalInterval}.
*/
static Interval getNextLoopupInterval(Interval totalInterval)
{
final Duration givenDuration = totalInterval.toDuration();
return givenDuration.isLongerThan(LOOKUP_DURATION) ?
new Interval(LOOKUP_PERIOD, totalInterval.getEnd()) :
totalInterval;
return numCurrentSegments + numAdditionalSegments <= numTargetSegments;
}
/**

View File

@ -52,6 +52,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -65,6 +65,7 @@ public class DruidCoordinatorSegmentCompactorTest
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
long targetCompactionSizeBytes,
int compactionTaskPriority,
ClientCompactQueryTuningConfig tuningConfig,
Map<String, Object> context
@ -129,28 +130,11 @@ public class DruidCoordinatorSegmentCompactorTest
}
};
private List<DataSourceCompactionConfig> compactionConfigs;
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
@Before
public void setup()
{
compactionConfigs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
0,
50L,
null,
new Period("PT1H"), // smaller than segment interval
null,
null
)
);
}
dataSources = new HashMap<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
@ -213,8 +197,9 @@ public class DruidCoordinatorSegmentCompactorTest
}
@Test
public void testRun()
public void testRunWithoutKeepSegmentGranularity()
{
final boolean keepSegmentGranularity = false;
final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
@ -233,7 +218,8 @@ public class DruidCoordinatorSegmentCompactorTest
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
compactor,
Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 8, 9)),
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 8, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
@ -243,7 +229,8 @@ public class DruidCoordinatorSegmentCompactorTest
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 4, 8)),
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 4, 8),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
@ -253,61 +240,112 @@ public class DruidCoordinatorSegmentCompactorTest
expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", endDay - 1, endDay)),
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", endDay - 1, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
}
// Segments of the latest interval should not be compacted
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10"));
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(interval);
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(2, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
Assert.assertEquals("version", segment.getVersion());
}
}
}
// Emulating realtime dataSource
final String dataSource = DATA_SOURCE_PREFIX + 0;
addMoreData(dataSource, 9);
CoordinatorStats stats = runCompactor(compactor);
Assert.assertEquals(
1,
stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT)
);
addMoreData(dataSource, 10);
stats = runCompactor(compactor);
Assert.assertEquals(
1,
stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT)
);
assertLastSegmentNotCompacted(compactor, keepSegmentGranularity);
}
private CoordinatorStats runCompactor(DruidCoordinatorSegmentCompactor compactor)
@Test
public void testRunWithKeepSegmentGranularity()
{
final boolean keepSegmentGranularity = true;
final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
private int i = 0;
@Override
public String get()
{
return "newVersion_" + i++;
}
};
int expectedCompactTaskCount = 1;
int expectedRemainingSegments = 200;
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 9, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
expectedRemainingSegments -= 20;
assertCompactSegments(
compactor,
keepSegmentGranularity,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
expectedRemainingSegments,
expectedCompactTaskCount,
expectedVersionSupplier
);
}
assertLastSegmentNotCompacted(compactor, keepSegmentGranularity);
}
private CoordinatorStats runCompactor(DruidCoordinatorSegmentCompactor compactor, boolean keepSegmentGranularity)
{
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder()
.withDataSources(dataSources)
.withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
.withCompactionConfig(CoordinatorCompactionConfig.from(createCompactionConfigs(keepSegmentGranularity)))
.build();
return compactor.run(params).getCoordinatorStats();
}
private void assertCompactSegments(
DruidCoordinatorSegmentCompactor compactor,
boolean keepSegmentGranularity,
Interval expectedInterval,
int expectedRemainingSegments,
int expectedCompactTaskCount,
@ -315,7 +353,7 @@ public class DruidCoordinatorSegmentCompactorTest
)
{
for (int i = 0; i < 3; i++) {
final CoordinatorStats stats = runCompactor(compactor);
final CoordinatorStats stats = runCompactor(compactor, keepSegmentGranularity);
Assert.assertEquals(
expectedCompactTaskCount,
stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT)
@ -356,6 +394,44 @@ public class DruidCoordinatorSegmentCompactorTest
}
}
private void assertLastSegmentNotCompacted(DruidCoordinatorSegmentCompactor compactor, boolean keepSegmentGranularity)
{
// Segments of the latest interval should not be compacted
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10"));
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(interval);
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
Assert.assertEquals(2, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
Assert.assertEquals("version", segment.getVersion());
}
}
}
// Emulating realtime dataSource
final String dataSource = DATA_SOURCE_PREFIX + 0;
addMoreData(dataSource, 9);
CoordinatorStats stats = runCompactor(compactor, keepSegmentGranularity);
Assert.assertEquals(
1,
stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT)
);
addMoreData(dataSource, 10);
stats = runCompactor(compactor, keepSegmentGranularity);
Assert.assertEquals(
1,
stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT)
);
}
private void addMoreData(String dataSource, int day)
{
for (int i = 0; i < 2; i++) {
@ -373,4 +449,26 @@ public class DruidCoordinatorSegmentCompactorTest
);
}
}
private static List<DataSourceCompactionConfig> createCompactionConfigs(boolean keepSegmentGranularity)
{
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
compactionConfigs.add(
new DataSourceCompactionConfig(
dataSource,
keepSegmentGranularity,
0,
50L,
50L,
null,
new Period("PT1H"), // smaller than segment interval
null,
null
)
);
}
return compactionConfigs;
}
}

View File

@ -34,6 +34,8 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,13 +43,26 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class NewestSegmentFirstPolicyTest
{
private static final String DATA_SOURCE = "dataSource";
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4;
@Parameterized.Parameters(name = "keepSegmentGranularity = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{false}, new Object[]{true});
}
private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy();
private final boolean keepSegmentGranularity;
public NewestSegmentFirstPolicyTest(boolean keepSegmentGranularity)
{
this.keepSegmentGranularity = keepSegmentGranularity;
}
@Test
public void testLargeOffsetAndSmallSegmentInterval()
@ -98,29 +113,54 @@ public class NewestSegmentFirstPolicyTest
final List<DataSegment> segments = iterator.next();
Assert.assertNotNull(segments);
Assert.assertEquals(8, segments.size());
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"));
if (keepSegmentGranularity) {
// If keepSegmentGranularity = true, the iterator returns the segments of only the next time chunk.
Assert.assertEquals(4, segments.size());
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"));
}
Assert.assertEquals(
expectedIntervals,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"),
Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"),
false
);
} else {
// If keepSegmentGranularity = false, the returned segments can span over multiple time chunks.
Assert.assertEquals(8, segments.size());
final List<Interval> expectedIntervals = new ArrayList<>(segments.size());
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"));
}
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"));
}
expectedIntervals.sort(Comparators.intervalsByStartThenEnd());
Assert.assertEquals(
expectedIntervals,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
Intervals.of("2017-11-16T05:00:00/2017-11-16T06:00:00"),
true
);
}
for (int i = 0; i < 4; i++) {
expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"));
}
expectedIntervals.sort(Comparators.intervalsByStartThenEnd());
Assert.assertEquals(
expectedIntervals,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
Intervals.of("2017-11-16T05:00:00/2017-11-16T06:00:00"),
true
);
}
@Test
@ -176,9 +216,7 @@ public class NewestSegmentFirstPolicyTest
iterator,
segmentPeriod,
Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"),
// The last interval is not "2017-11-17T01:00:00/2017-11-17T02:00:00". This is because more segments are
// expected to be added for that interval. See NewestSegmentFirstIterator.returnIfCompactibleSize().
Intervals.of("2017-11-17T00:00:00/2017-11-17T01:00:00"),
Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"),
false
);
@ -358,6 +396,63 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testSkipUnknownDataSource()
{
final String unknownDataSource = "unknown";
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(
unknownDataSource,
createCompactionConfig(10000, 100, new Period("P2D")),
DATA_SOURCE,
createCompactionConfig(10000, 100, new Period("P2D"))
),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
)
)
);
assertCompactSegmentIntervals(
iterator,
segmentPeriod,
Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"),
Intervals.of("2017-11-15T03:00:00/2017-11-15T04:00:00"),
true
);
}
@Test
public void testIgnoreSingleSegmentToCompact()
{
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
new Period("P1D"),
200,
1
),
new SegmentGenerateSpec(
Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
new Period("P1D"),
200,
1
)
)
)
);
Assert.assertFalse(iterator.hasNext());
}
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
@ -447,7 +542,7 @@ public class NewestSegmentFirstPolicyTest
return timeline;
}
private static DataSourceCompactionConfig createCompactionConfig(
private DataSourceCompactionConfig createCompactionConfig(
long targetCompactionSizeBytes,
int numTargetCompactionSegments,
Period skipOffsetFromLatest
@ -455,8 +550,10 @@ public class NewestSegmentFirstPolicyTest
{
return new DataSourceCompactionConfig(
DATA_SOURCE,
keepSegmentGranularity,
0,
targetCompactionSizeBytes,
targetCompactionSizeBytes,
numTargetCompactionSegments,
skipOffsetFromLatest,
null,