Clean up compaction logs on coordinator (#14875)

Changes:
- Move logic of `NewestSegmentFirstIterator.needsCompaction` to `CompactionStatus`
to improve testability and readability
- Capture the list of checks performed to determine if compaction is needed in a readable
manner in `CompactionStatus.CHECKS`
- Make `CompactionSegmentIterator` iterate over instances of `SegmentsToCompact`
instead of `List<DataSegment>`. This allows use of the `umbrellaInterval` later.
- Replace usages of `QueueEntry` with `SegmentsToCompact`
- Move `SegmentsToCompact` out of `NewestSegmentFirstIterator`
- Simplify `CompactionStatistics`
- Reduce level of less important logs to debug
- No change made to tests to ensure correctness
This commit is contained in:
Kashif Faraz 2023-08-21 17:30:41 +05:30 committed by GitHub
parent 07a193a142
commit c211dcc4b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 741 additions and 553 deletions

View File

@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -142,8 +142,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
final List<DataSegment> segments = iterator.next();
blackhole.consume(segments);
blackhole.consume(iterator.next());
}
}
}

View File

@ -68,7 +68,7 @@ public class Tasks
* This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details.
* org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} for more details.
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
public class CompactionStatistics
{
private long byteSum;
private long segmentNumberCountSum;
private long segmentIntervalCountSum;
public CompactionStatistics(
long byteSum,
long segmentNumberCountSum,
long segmentIntervalCountSum
)
{
this.byteSum = byteSum;
this.segmentNumberCountSum = segmentNumberCountSum;
this.segmentIntervalCountSum = segmentIntervalCountSum;
}
public static CompactionStatistics initializeCompactionStatistics()
{
return new CompactionStatistics(0, 0, 0);
}
public long getByteSum()
{
return byteSum;
}
public long getSegmentNumberCountSum()
{
return segmentNumberCountSum;
}
public long getSegmentIntervalCountSum()
{
return segmentIntervalCountSum;
}
public void incrementCompactedByte(long incrementValue)
{
byteSum = byteSum + incrementValue;
}
public void incrementCompactedSegments(long incrementValue)
{
segmentNumberCountSum = segmentNumberCountSum + incrementValue;
}
public void incrementCompactedIntervals(long incrementValue)
{
segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
}
}

View File

@ -62,10 +62,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;

View File

@ -17,20 +17,18 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.timeline.DataSegment;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
* (see {@link DataSegment#compareTo}).
*/
public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
public interface CompactionSegmentIterator extends Iterator<SegmentsToCompact>
{
/**
* Return a map of dataSourceName to CompactionStatistics.

View File

@ -17,9 +17,10 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
/**
* Used to track statistics for segments in different states of compaction.
*/
public class CompactionStatistics
{
private long totalBytes;
private long numSegments;
private long numIntervals;
public static CompactionStatistics create()
{
return new CompactionStatistics();
}
public long getTotalBytes()
{
return totalBytes;
}
public long getNumSegments()
{
return numSegments;
}
public long getNumIntervals()
{
return numIntervals;
}
public void addFrom(SegmentsToCompact segments)
{
totalBytes += segments.getTotalBytes();
numIntervals += segments.getNumIntervals();
numSegments += segments.size();
}
}

View File

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
/**
* Represents the status of compaction for a given list of candidate segments.
*/
public class CompactionStatus
{
private static final CompactionStatus COMPLETE = new CompactionStatus(true, null);
/**
* List of checks performed to determine if compaction is already complete.
* <p>
* The order of the checks must be honored while evaluating them.
*/
private static final List<Function<Evaluator, CompactionStatus>> CHECKS = Arrays.asList(
Evaluator::segmentsHaveBeenCompactedAtLeastOnce,
Evaluator::allCandidatesHaveSameCompactionState,
Evaluator::partitionsSpecIsUpToDate,
Evaluator::indexSpecIsUpToDate,
Evaluator::segmentGranularityIsUpToDate,
Evaluator::queryGranularityIsUpToDate,
Evaluator::rollupIsUpToDate,
Evaluator::dimensionsSpecIsUpToDate,
Evaluator::metricsSpecIsUpToDate,
Evaluator::transformSpecFilterIsUpToDate
);
private final boolean complete;
private final String reasonToCompact;
private CompactionStatus(boolean complete, String reason)
{
this.complete = complete;
this.reasonToCompact = reason;
}
public boolean isComplete()
{
return complete;
}
public String getReasonToCompact()
{
return reasonToCompact;
}
private static CompactionStatus incomplete(String reasonFormat, Object... args)
{
return new CompactionStatus(false, StringUtils.format(reasonFormat, args));
}
private static CompactionStatus completeIfEqual(String field, Object configured, Object current)
{
if (configured == null || configured.equals(current)) {
return COMPLETE;
} else {
return configChanged(field, configured, current);
}
}
private static CompactionStatus configChanged(String field, Object configured, Object current)
{
return CompactionStatus.incomplete(
"Configured %s[%s] is different from current %s[%s]",
field, configured, field, current
);
}
/**
* Determines the CompactionStatus of the given candidate segments by evaluating
* the {@link #CHECKS} one by one. If any check returns an incomplete status,
* further checks are not performed and the incomplete status is returned.
*/
static CompactionStatus of(
SegmentsToCompact candidateSegments,
DataSourceCompactionConfig config,
ObjectMapper objectMapper
)
{
final Evaluator evaluator = new Evaluator(candidateSegments, config, objectMapper);
return CHECKS.stream().map(f -> f.apply(evaluator))
.filter(status -> !status.isComplete())
.findFirst().orElse(COMPLETE);
}
static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
{
final PartitionsSpec partitionsSpecFromTuningConfig = tuningConfig.getPartitionsSpec();
if (partitionsSpecFromTuningConfig == null) {
final long maxTotalRows = Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows);
} else if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
return new DynamicPartitionsSpec(
partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
((DynamicPartitionsSpec) partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else {
return partitionsSpecFromTuningConfig;
}
}
/**
* Evaluates {@link #CHECKS} to determine the compaction status.
*/
private static class Evaluator
{
private final ObjectMapper objectMapper;
private final DataSourceCompactionConfig compactionConfig;
private final SegmentsToCompact candidateSegments;
private final CompactionState lastCompactionState;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
private final ClientCompactionTaskGranularitySpec existingGranularitySpec;
private final UserCompactionTaskGranularityConfig configuredGranularitySpec;
private Evaluator(
SegmentsToCompact candidateSegments,
DataSourceCompactionConfig compactionConfig,
ObjectMapper objectMapper
)
{
Preconditions.checkArgument(!candidateSegments.isEmpty(), "Empty candidates");
this.candidateSegments = candidateSegments;
this.objectMapper = objectMapper;
this.lastCompactionState = candidateSegments.getFirst().getLastCompactionState();
this.compactionConfig = compactionConfig;
this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
compactionConfig.getTuningConfig(),
compactionConfig.getMaxRowsPerSegment(),
null
);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
if (lastCompactionState == null) {
this.existingGranularitySpec = null;
} else {
this.existingGranularitySpec = convertIfNotNull(
lastCompactionState.getGranularitySpec(),
ClientCompactionTaskGranularitySpec.class
);
}
}
private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
{
if (lastCompactionState == null) {
return CompactionStatus.incomplete("Not compacted yet");
} else {
return COMPLETE;
}
}
private CompactionStatus allCandidatesHaveSameCompactionState()
{
final boolean allHaveSameCompactionState = candidateSegments.getSegments().stream().allMatch(
segment -> lastCompactionState.equals(segment.getLastCompactionState())
);
if (allHaveSameCompactionState) {
return COMPLETE;
} else {
return CompactionStatus.incomplete("Candidate segments have different last compaction states.");
}
}
private CompactionStatus partitionsSpecIsUpToDate()
{
return CompactionStatus.completeIfEqual(
"partitionsSpec",
findPartitionsSpecFromConfig(tuningConfig),
lastCompactionState.getPartitionsSpec()
);
}
private CompactionStatus indexSpecIsUpToDate()
{
return CompactionStatus.completeIfEqual(
"indexSpec",
Configs.valueOrDefault(tuningConfig.getIndexSpec(), IndexSpec.DEFAULT),
objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class)
);
}
private CompactionStatus segmentGranularityIsUpToDate()
{
if (configuredGranularitySpec == null
|| configuredGranularitySpec.getSegmentGranularity() == null) {
return COMPLETE;
}
final Granularity configuredSegmentGranularity = configuredGranularitySpec.getSegmentGranularity();
final Granularity existingSegmentGranularity
= existingGranularitySpec == null ? null : existingGranularitySpec.getSegmentGranularity();
if (configuredSegmentGranularity.equals(existingSegmentGranularity)) {
return COMPLETE;
} else if (existingSegmentGranularity == null) {
// Candidate segments were compacted without segment granularity specified
// Check if the segments already have the desired segment granularity
boolean needsCompaction = candidateSegments.getSegments().stream().anyMatch(
segment -> !configuredSegmentGranularity.isAligned(segment.getInterval())
);
if (needsCompaction) {
return CompactionStatus.incomplete(
"Configured segmentGranularity[%s] does not align with segment intervals.",
configuredSegmentGranularity
);
}
} else {
return CompactionStatus.configChanged(
"segmentGranularity",
configuredSegmentGranularity,
existingSegmentGranularity
);
}
return COMPLETE;
}
private CompactionStatus rollupIsUpToDate()
{
if (configuredGranularitySpec == null) {
return COMPLETE;
} else {
return CompactionStatus.completeIfEqual(
"rollup",
configuredGranularitySpec.isRollup(),
existingGranularitySpec == null ? null : existingGranularitySpec.isRollup()
);
}
}
private CompactionStatus queryGranularityIsUpToDate()
{
if (configuredGranularitySpec == null) {
return COMPLETE;
} else {
return CompactionStatus.completeIfEqual(
"queryGranularity",
configuredGranularitySpec.getQueryGranularity(),
existingGranularitySpec == null ? null : existingGranularitySpec.getQueryGranularity()
);
}
}
private CompactionStatus dimensionsSpecIsUpToDate()
{
if (compactionConfig.getDimensionsSpec() == null) {
return COMPLETE;
} else {
final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
return CompactionStatus.completeIfEqual(
"dimensionsSpec",
compactionConfig.getDimensionsSpec().getDimensions(),
existingDimensionsSpec == null ? null : existingDimensionsSpec.getDimensions()
);
}
}
private CompactionStatus metricsSpecIsUpToDate()
{
final AggregatorFactory[] configuredMetricsSpec = compactionConfig.getMetricsSpec();
if (ArrayUtils.isEmpty(configuredMetricsSpec)) {
return COMPLETE;
}
final List<Object> metricSpecList = lastCompactionState.getMetricsSpec();
final AggregatorFactory[] existingMetricsSpec
= CollectionUtils.isNullOrEmpty(metricSpecList)
? null : objectMapper.convertValue(metricSpecList, AggregatorFactory[].class);
if (existingMetricsSpec == null || !Arrays.deepEquals(configuredMetricsSpec, existingMetricsSpec)) {
return CompactionStatus.configChanged(
"metricsSpec",
Arrays.toString(configuredMetricsSpec),
Arrays.toString(existingMetricsSpec)
);
} else {
return COMPLETE;
}
}
private CompactionStatus transformSpecFilterIsUpToDate()
{
if (compactionConfig.getTransformSpec() == null) {
return COMPLETE;
}
ClientCompactionTaskTransformSpec existingTransformSpec = convertIfNotNull(
lastCompactionState.getTransformSpec(),
ClientCompactionTaskTransformSpec.class
);
return CompactionStatus.completeIfEqual(
"transformSpec filter",
compactionConfig.getTransformSpec().getFilter(),
existingTransformSpec == null ? null : existingTransformSpec.getFilter()
);
}
@Nullable
private <T> T convertIfNotNull(Object object, Class<T> clazz)
{
if (object == null) {
return null;
} else {
return objectMapper.convertValue(object, clazz);
}
}
}
}

View File

@ -17,21 +17,13 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.ArrayUtils;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@ -39,13 +31,7 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
@ -61,7 +47,6 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -70,7 +55,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
@ -84,21 +68,19 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
private final Map<String, CompactionStatistics> compactedSegments = new HashMap<>();
private final Map<String, CompactionStatistics> skippedSegments = new HashMap<>();
private final Map<String, CompactionStatistics> compactedSegmentStats = new HashMap<>();
private final Map<String, CompactionStatistics> skippedSegmentStats = new HashMap<>();
// dataSource -> intervalToFind
// searchIntervals keeps track of the current state of which interval should be considered to search segments to
// compact.
private final Map<String, CompactibleTimelineObjectHolderCursor> timelineIterators;
// This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current segmentGranularity, the same set of segments
// can belong to multiple intervals in the timeline. We keep track of the compacted intervals between each
// run of the compaction job and skip any interval that was already previously compacted.
private final Map<String, Set<Interval>> intervalCompactedForDatasource = new HashMap<>();
private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, o1.interval)
private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval())
);
NewestSegmentFirstIterator(
@ -112,11 +94,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
this.compactionConfigs = compactionConfigs;
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
dataSources.forEach((String dataSource, SegmentTimeline timeline) -> {
dataSources.forEach((dataSource, timeline) -> {
final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
VersionedIntervalTimeline<String, DataSegment> originalTimeline = null;
SegmentTimeline originalTimeline = null;
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
String temporaryVersion = DateTimes.nowUtc().toString();
Map<Interval, Set<DataSegment>> intervalToPartitionMap = new HashMap<>();
@ -175,7 +157,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
}
});
compactionConfigs.forEach((String dataSourceName, DataSourceCompactionConfig config) -> {
compactionConfigs.forEach((dataSourceName, config) -> {
if (config == null) {
throw new ISE("Unknown dataSource[%s]", dataSourceName);
}
@ -186,13 +168,13 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
@Override
public Map<String, CompactionStatistics> totalCompactedStatistics()
{
return compactedSegments;
return compactedSegmentStats;
}
@Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
{
return skippedSegments;
return skippedSegmentStats;
}
@Override
@ -202,27 +184,24 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
}
@Override
public List<DataSegment> next()
public SegmentsToCompact next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final QueueEntry entry = queue.poll();
final SegmentsToCompact entry = queue.poll();
if (entry == null) {
throw new NoSuchElementException();
}
final List<DataSegment> resultSegments = entry.segments;
final List<DataSegment> resultSegments = entry.getSegments();
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
final String dataSource = resultSegments.get(0).getDataSource();
updateQueue(dataSource, compactionConfigs.get(dataSource));
return resultSegments;
return entry;
}
/**
@ -232,23 +211,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
*/
private void updateQueue(String dataSourceName, DataSourceCompactionConfig config)
{
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = timelineIterators.get(
dataSourceName
);
if (compactibleTimelineObjectHolderCursor == null) {
log.warn("Cannot find timeline for dataSource[%s]. Skip this dataSource", dataSourceName);
return;
}
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
dataSourceName,
compactibleTimelineObjectHolderCursor,
config
);
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
if (!segmentsToCompact.isEmpty()) {
queue.add(new QueueEntry(segmentsToCompact.segments));
queue.add(segmentsToCompact);
}
}
@ -260,13 +225,13 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
@Nullable
private final VersionedIntervalTimeline<String, DataSegment> originalTimeline;
private final SegmentTimeline originalTimeline;
CompactibleTimelineObjectHolderCursor(
VersionedIntervalTimeline<String, DataSegment> timeline,
SegmentTimeline timeline,
List<Interval> totalIntervalsToSearch,
// originalTimeline can be nullable if timeline was not modified
@Nullable VersionedIntervalTimeline<String, DataSegment> originalTimeline
// originalTimeline can be null if timeline was not modified
@Nullable SegmentTimeline originalTimeline
)
{
this.holders = totalIntervalsToSearch
@ -313,284 +278,93 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
List<DataSegment> candidates = Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
if (originalTimeline != null) {
Interval umbrellaInterval = JodaUtils.umbrellaInterval(candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
return Lists.newArrayList(originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval, Partitions.ONLY_COMPLETE));
}
return candidates;
}
}
@VisibleForTesting
static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
{
final PartitionsSpec partitionsSpecFromTuningConfig = tuningConfig.getPartitionsSpec();
if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
return new DynamicPartitionsSpec(
partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
((DynamicPartitionsSpec) partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else {
final long maxTotalRows = tuningConfig.getMaxTotalRows() != null
? tuningConfig.getMaxTotalRows()
: Long.MAX_VALUE;
return partitionsSpecFromTuningConfig == null
? new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows)
: partitionsSpecFromTuningConfig;
}
}
private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCompact candidates)
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null);
final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
log.info("Candidate segment[%s] is not compacted yet. Needs compaction.", candidates.segments.get(0).getId());
return true;
}
final boolean allCandidatesHaveSameLastCompactionState = candidates
.segments
.stream()
.allMatch(segment -> lastCompactionState.equals(segment.getLastCompactionState()));
if (!allCandidatesHaveSameLastCompactionState) {
log.info(
"[%s] Candidate segments were compacted with different partitions spec. Needs compaction.",
candidates.segments.size()
);
log.debugSegments(
candidates.segments,
"Candidate segments compacted with different partiton spec"
);
return true;
}
final PartitionsSpec segmentPartitionsSpec = lastCompactionState.getPartitionsSpec();
final IndexSpec segmentIndexSpec = objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class);
final IndexSpec configuredIndexSpec;
if (tuningConfig.getIndexSpec() == null) {
configuredIndexSpec = IndexSpec.DEFAULT;
} else {
configuredIndexSpec = tuningConfig.getIndexSpec();
}
if (!Objects.equals(partitionsSpecFromConfig, segmentPartitionsSpec)) {
log.info(
"Configured partitionsSpec[%s] is differenet from "
+ "the partitionsSpec[%s] of segments. Needs compaction.",
partitionsSpecFromConfig,
segmentPartitionsSpec
);
return true;
}
// segmentIndexSpec cannot be null.
if (!segmentIndexSpec.equals(configuredIndexSpec)) {
log.info(
"Configured indexSpec[%s] is different from the one[%s] of segments. Needs compaction",
configuredIndexSpec,
segmentIndexSpec
);
return true;
}
if (config.getGranularitySpec() != null) {
final ClientCompactionTaskGranularitySpec existingGranularitySpec = lastCompactionState.getGranularitySpec() != null ?
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), ClientCompactionTaskGranularitySpec.class) :
null;
// Checks for segmentGranularity
if (config.getGranularitySpec().getSegmentGranularity() != null) {
final Granularity existingSegmentGranularity = existingGranularitySpec != null ?
existingGranularitySpec.getSegmentGranularity() :
null;
if (existingSegmentGranularity == null) {
// Candidate segments were all compacted without segment granularity set.
// We need to check if all segments have the same segment granularity as the configured segment granularity.
boolean needsCompaction = candidates.segments.stream()
.anyMatch(segment -> !config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
if (needsCompaction) {
log.info(
"Segments were previously compacted but without segmentGranularity in auto compaction."
+ " Configured segmentGranularity[%s] is different from granularity implied by segment intervals. Needs compaction",
config.getGranularitySpec().getSegmentGranularity()
);
return true;
}
} else if (!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity)) {
log.info(
"Configured segmentGranularity[%s] is different from the segmentGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getSegmentGranularity(),
existingSegmentGranularity
);
return true;
}
}
// Checks for rollup
if (config.getGranularitySpec().isRollup() != null) {
final Boolean existingRollup = existingGranularitySpec != null ?
existingGranularitySpec.isRollup() :
null;
if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) {
log.info(
"Configured rollup[%s] is different from the rollup[%s] of segments. Needs compaction",
config.getGranularitySpec().isRollup(),
existingRollup
);
return true;
}
}
// Checks for queryGranularity
if (config.getGranularitySpec().getQueryGranularity() != null) {
final Granularity existingQueryGranularity = existingGranularitySpec != null ?
existingGranularitySpec.getQueryGranularity() :
null;
if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) {
log.info(
"Configured queryGranularity[%s] is different from the queryGranularity[%s] of segments. Needs compaction",
config.getGranularitySpec().getQueryGranularity(),
existingQueryGranularity
);
return true;
}
}
}
if (config.getDimensionsSpec() != null) {
final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
// Checks for list of dimensions
if (config.getDimensionsSpec().getDimensions() != null) {
final List<DimensionSchema> existingDimensions = existingDimensionsSpec != null ?
existingDimensionsSpec.getDimensions() :
null;
if (!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) {
log.info(
"Configured dimensionsSpec is different from the dimensionsSpec of segments. Needs compaction"
);
return true;
}
}
}
if (config.getTransformSpec() != null) {
final ClientCompactionTaskTransformSpec existingTransformSpec = lastCompactionState.getTransformSpec() != null ?
objectMapper.convertValue(lastCompactionState.getTransformSpec(), ClientCompactionTaskTransformSpec.class) :
null;
// Checks for filters
if (config.getTransformSpec().getFilter() != null) {
final DimFilter existingFilters = existingTransformSpec != null ?
existingTransformSpec.getFilter() :
null;
if (!config.getTransformSpec().getFilter().equals(existingFilters)) {
log.info(
"Configured filter[%s] is different from the filter[%s] of segments. Needs compaction",
config.getTransformSpec().getFilter(),
existingFilters
);
return true;
}
}
}
if (ArrayUtils.isNotEmpty(config.getMetricsSpec())) {
final AggregatorFactory[] existingMetricsSpec = lastCompactionState.getMetricsSpec() == null || lastCompactionState.getMetricsSpec().isEmpty() ?
null :
objectMapper.convertValue(lastCompactionState.getMetricsSpec(), AggregatorFactory[].class);
if (existingMetricsSpec == null || !Arrays.deepEquals(config.getMetricsSpec(), existingMetricsSpec)) {
log.info(
"Configured metricsSpec[%s] is different from the metricsSpec[%s] of segments. Needs compaction",
Arrays.toString(config.getMetricsSpec()),
Arrays.toString(existingMetricsSpec)
if (originalTimeline == null) {
return candidates;
} else {
Interval umbrellaInterval = JodaUtils.umbrellaInterval(
candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
return Lists.newArrayList(
originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval, Partitions.ONLY_COMPLETE)
);
return true;
}
}
return false;
}
/**
* Find segments to compact together for the given intervalToSearch. It progressively searches the given
* intervalToSearch in time order (latest first). The timeline lookup duration is one day. It means, the timeline is
* looked up for the last one day of the given intervalToSearch, and the next day is searched again if the size of
* found segments are not enough to compact. This is repeated until enough amount of segments are found.
* Finds segments to compact together for the given datasource.
*
* @return segments to compact
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
*/
private SegmentsToCompact findSegmentsToCompact(
final String dataSourceName,
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
{
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor
= timelineIterators.get(dataSourceName);
if (compactibleTimelineObjectHolderCursor == null) {
log.warn("Skipping dataSource[%s] as there is no timeline for it.", dataSourceName);
return SegmentsToCompact.empty();
}
final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleTimelineObjectHolderCursor.hasNext()) {
List<DataSegment> segments = compactibleTimelineObjectHolderCursor.next();
final SegmentsToCompact candidates = new SegmentsToCompact(segments);
if (!candidates.isEmpty()) {
final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
final boolean needsCompaction = needsCompaction(
config,
candidates
);
if (isCompactibleSize && needsCompaction) {
if (config.getGranularitySpec() != null && config.getGranularitySpec().getSegmentGranularity() != null) {
Interval interval = candidates.getUmbrellaInterval();
Set<Interval> intervalsCompacted = intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new HashSet<>());
// Skip this candidates if we have compacted the interval already
if (intervalsCompacted.contains(interval)) {
continue;
}
intervalsCompacted.add(interval);
}
return candidates;
} else {
if (!needsCompaction) {
// Collect statistic for segments that is already compacted
collectSegmentStatistics(compactedSegments, dataSourceName, candidates);
} else {
// Collect statistic for segments that is skipped
// Note that if segments does not need compaction then we do not double count here
collectSegmentStatistics(skippedSegments, dataSourceName, candidates);
log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.",
candidates.getTotalSize(),
candidates.segments.get(0).getDataSource(),
candidates.segments.get(0).getInterval(),
inputSegmentSize
);
}
}
} else {
if (segments.isEmpty()) {
throw new ISE("No segment is found?");
}
final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
final Interval interval = candidates.getUmbrellaInterval();
final CompactionStatus compactionStatus = CompactionStatus.of(candidates, config, objectMapper);
if (!compactionStatus.isComplete()) {
log.debug(
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
dataSourceName, interval, candidates.size(), compactionStatus.getReasonToCompact()
);
}
if (compactionStatus.isComplete()) {
addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
} else if (candidates.getTotalBytes() > inputSegmentSize) {
addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
log.warn(
"Skipping compaction for datasource[%s], interval[%s] as total segment size[%d]"
+ " is larger than allowed inputSegmentSize[%d].",
dataSourceName, interval, candidates.getTotalBytes(), inputSegmentSize
);
} else if (config.getGranularitySpec() != null
&& config.getGranularitySpec().getSegmentGranularity() != null) {
Set<Interval> compactedIntervals = intervalCompactedForDatasource
.computeIfAbsent(dataSourceName, k -> new HashSet<>());
if (compactedIntervals.contains(interval)) {
// Skip these candidate segments as we have already compacted this interval
} else {
compactedIntervals.add(interval);
return candidates;
}
} else {
return candidates;
}
}
log.info("All segments look good! Nothing to compact");
return new SegmentsToCompact();
log.debug("All segments look good! Nothing to compact");
return SegmentsToCompact.empty();
}
private void collectSegmentStatistics(
private void addSegmentStatsTo(
Map<String, CompactionStatistics> statisticsMap,
String dataSourceName,
SegmentsToCompact segments)
SegmentsToCompact segments
)
{
CompactionStatistics statistics = statisticsMap.computeIfAbsent(
dataSourceName,
v -> CompactionStatistics.initializeCompactionStatistics()
);
statistics.incrementCompactedByte(segments.getTotalSize());
statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
statistics.incrementCompactedSegments(segments.getNumberOfSegments());
statisticsMap.computeIfAbsent(dataSourceName, v -> CompactionStatistics.create())
.addFrom(segments);
}
/**
@ -621,10 +395,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
skipIntervals
);
// Calcuate stats of all skipped segments
// Collect stats for all skipped segments
for (Interval skipInterval : fullSkipIntervals) {
final List<DataSegment> segments = new ArrayList<>(timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE));
collectSegmentStatistics(skippedSegments, dataSourceName, new SegmentsToCompact(segments));
final List<DataSegment> segments = new ArrayList<>(
timeline.findNonOvershadowedObjectsInInterval(skipInterval, Partitions.ONLY_COMPLETE)
);
addSegmentStatsTo(skippedSegmentStats, dataSourceName, SegmentsToCompact.from(segments));
}
final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
@ -749,81 +525,4 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return filteredIntervals;
}
private static class QueueEntry
{
private final Interval interval; // whole interval for all segments
private final List<DataSegment> segments;
private QueueEntry(List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && !segments.isEmpty());
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
this.interval = new Interval(minStart, maxEnd);
this.segments = segments;
}
private String getDataSource()
{
return segments.get(0).getDataSource();
}
}
private static class SegmentsToCompact
{
private final List<DataSegment> segments;
private final long totalSize;
private SegmentsToCompact()
{
this(Collections.emptyList());
}
private SegmentsToCompact(List<DataSegment> segments)
{
this.segments = segments;
this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
}
private boolean isEmpty()
{
return segments.isEmpty();
}
private long getTotalSize()
{
return totalSize;
}
private long getNumberOfSegments()
{
return segments.size();
}
private Interval getUmbrellaInterval()
{
return JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
}
private long getNumberOfIntervals()
{
return segments.stream().map(DataSegment::getInterval).distinct().count();
}
@Override
public String toString()
{
return "SegmentsToCompact{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", totalSize=" + totalSize +
'}';
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.compact;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
/**
* List of segments to compact.
*/
public class SegmentsToCompact
{
private static final SegmentsToCompact EMPTY_INSTANCE = new SegmentsToCompact();
private final List<DataSegment> segments;
private final Interval umbrellaInterval;
private final long totalBytes;
private final int numIntervals;
static SegmentsToCompact empty()
{
return EMPTY_INSTANCE;
}
public static SegmentsToCompact from(List<DataSegment> segments)
{
if (segments == null || segments.isEmpty()) {
return empty();
} else {
return new SegmentsToCompact(segments);
}
}
private SegmentsToCompact()
{
this.segments = Collections.emptyList();
this.totalBytes = 0L;
this.numIntervals = 0;
this.umbrellaInterval = null;
}
private SegmentsToCompact(List<DataSegment> segments)
{
this.segments = segments;
this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
this.umbrellaInterval = JodaUtils.umbrellaInterval(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
);
this.numIntervals = (int) segments.stream().map(DataSegment::getInterval).distinct().count();
}
public List<DataSegment> getSegments()
{
return segments;
}
public DataSegment getFirst()
{
if (segments.isEmpty()) {
throw new NoSuchElementException("No segment to compact");
} else {
return segments.get(0);
}
}
public boolean isEmpty()
{
return segments.isEmpty();
}
public long getTotalBytes()
{
return totalBytes;
}
public int size()
{
return segments.size();
}
public Interval getUmbrellaInterval()
{
return umbrellaInterval;
}
public long getNumIntervals()
{
return numIntervals;
}
@Override
public String toString()
{
return "SegmentsToCompact{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", totalSize=" + totalBytes +
'}';
}
}

View File

@ -45,10 +45,13 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.CompactionStatistics;
import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@ -364,7 +367,8 @@ public class CompactSegments implements CoordinatorCustomDuty
int numCompactionTasksAndSubtasks = 0;
while (iterator.hasNext() && numCompactionTasksAndSubtasks < numAvailableCompactionTaskSlots) {
final List<DataSegment> segmentsToCompact = iterator.next();
final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments();
if (segmentsToCompact.isEmpty()) {
throw new ISE("segmentsToCompact is empty?");
}
@ -403,11 +407,13 @@ public class CompactSegments implements CoordinatorCustomDuty
catch (IllegalArgumentException iae) {
// This case can happen if the existing segment interval result in complicated periods.
// Fall back to setting segmentGranularity as null
LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval);
LOG.warn("Cannot determine segmentGranularity from interval[%s].", interval);
}
} else {
LOG.warn(
"segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
"Not setting 'segmentGranularity' for auto-compaction task as"
+ " the segments to compact do not have the same interval."
);
}
} else {
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
@ -478,13 +484,17 @@ public class CompactSegments implements CoordinatorCustomDuty
newAutoCompactionContext(config.getTaskContext())
);
LOG.info("Submitted a compactionTask[%s] for [%d] segments", taskId, segmentsToCompact.size());
LOG.infoSegments(segmentsToCompact, "Compacting segments");
LOG.info(
"Submitted a compaction task[%s] for [%d] segments in datasource[%s], umbrella interval[%s].",
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
);
LOG.debugSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
numSubmittedTasks++;
numCompactionTasksAndSubtasks += findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig());
}
LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks);
return numSubmittedTasks;
}
@ -505,7 +515,8 @@ public class CompactSegments implements CoordinatorCustomDuty
{
// Mark all the segments remaining in the iterator as "awaiting compaction"
while (iterator.hasNext()) {
final List<DataSegment> segmentsToCompact = iterator.next();
final SegmentsToCompact entry = iterator.next();
final List<DataSegment> segmentsToCompact = entry.getSegments();
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@ -536,9 +547,9 @@ public class CompactSegments implements CoordinatorCustomDuty
dataSource,
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
}
// Statistics of all segments considered skipped after this run
@ -550,9 +561,9 @@ public class CompactSegments implements CoordinatorCustomDuty
dataSource,
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes());
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments());
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
}
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();

View File

@ -52,13 +52,13 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -100,7 +100,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -190,7 +190,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -235,7 +235,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -280,7 +280,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -325,7 +325,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -370,7 +370,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -415,7 +415,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
@ -460,7 +460,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -58,7 +57,7 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.assertj.core.api.Assertions;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -72,6 +71,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
@ -208,7 +208,7 @@ public class NewestSegmentFirstPolicyTest
Interval lastInterval = null;
while (iterator.hasNext()) {
final List<DataSegment> segments = iterator.next();
final List<DataSegment> segments = iterator.next().getSegments();
lastInterval = segments.get(0).getInterval();
Interval prevInterval = null;
@ -264,7 +264,7 @@ public class NewestSegmentFirstPolicyTest
Interval lastInterval = null;
while (iterator.hasNext()) {
final List<DataSegment> segments = iterator.next();
final List<DataSegment> segments = iterator.next().getSegments();
lastInterval = segments.get(0).getInterval();
Interval prevInterval = null;
@ -352,9 +352,13 @@ public class NewestSegmentFirstPolicyTest
);
expectedSegmentsToCompact2.sort(Comparator.naturalOrder());
Assertions.assertThat(iterator)
.toIterable()
.containsExactly(expectedSegmentsToCompact, expectedSegmentsToCompact2);
Set<List<DataSegment>> observedSegments = Streams.sequentialStreamFrom(iterator)
.map(SegmentsToCompact::getSegments)
.collect(Collectors.toSet());
Assert.assertEquals(
observedSegments,
ImmutableSet.of(expectedSegmentsToCompact, expectedSegmentsToCompact2)
);
}
@Test
@ -419,7 +423,13 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
Set<DataSegment> observedSegmentsToCompact = Streams.sequentialStreamFrom(iterator)
.flatMap(s -> s.getSegments().stream())
.collect(Collectors.toSet());
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
observedSegmentsToCompact
);
}
@Test
@ -446,7 +456,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
List<DataSegment> actual = iterator.next();
List<DataSegment> actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
Assert.assertFalse(iterator.hasNext());
@ -472,7 +482,13 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
Set<DataSegment> observedSegmentsToCompact = Streams.sequentialStreamFrom(iterator)
.flatMap(s -> s.getSegments().stream())
.collect(Collectors.toSet());
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
observedSegmentsToCompact
);
}
@Test
@ -585,7 +601,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// Month of Nov
Assert.assertTrue(iterator.hasNext());
@ -594,7 +610,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// Month of Oct
Assert.assertTrue(iterator.hasNext());
@ -603,7 +619,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -631,7 +647,7 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"), Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
List<DataSegment> actual = iterator.next();
List<DataSegment> actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
// Month of Jan
@ -639,7 +655,7 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"), Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
actual = iterator.next();
actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(actual));
// No more
@ -663,7 +679,10 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), ImmutableSet.copyOf(iterator.next()));
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next().getSegments())
);
// Iterator should return only once since all the "minute" interval of the iterator contains the same interval
Assert.assertFalse(iterator.hasNext());
}
@ -689,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -701,7 +720,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -734,7 +753,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -767,7 +786,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -798,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -810,7 +829,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -841,7 +860,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -853,7 +872,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -893,7 +912,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -905,7 +924,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -944,7 +963,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -956,7 +975,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -996,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1004,7 +1023,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1016,7 +1035,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1056,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1064,7 +1083,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1076,7 +1095,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1130,7 +1149,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1138,7 +1157,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1146,7 +1165,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1175,7 +1194,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1250,7 +1269,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1258,7 +1277,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1266,7 +1285,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1299,7 +1318,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@ -1374,7 +1393,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1382,7 +1401,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@ -1390,7 +1409,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1436,7 +1455,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1448,7 +1467,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new ConciseBitmapSerdeFactory()).build();
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@ -1487,7 +1506,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next())
ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@ -1497,7 +1516,7 @@ public class NewestSegmentFirstPolicyTest
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
{
NullHandling.initializeForTests();
PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
PartitionsSpec partitionsSpec = CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
final SegmentTimeline timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@ -1691,7 +1710,7 @@ public class NewestSegmentFirstPolicyTest
{
Interval expectedSegmentIntervalStart = to;
while (iterator.hasNext()) {
final List<DataSegment> segments = iterator.next();
final List<DataSegment> segments = iterator.next().getSegments();
final Interval firstInterval = segments.get(0).getInterval();
Assert.assertTrue(

View File

@ -75,6 +75,8 @@ import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.CompactionState;

View File

@ -47,9 +47,9 @@ import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyF
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.DiskNormalizedCostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.Rule;

View File

@ -81,7 +81,8 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@ -92,7 +93,6 @@ import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource;