Auto-compaction snapshot status API (#10371)

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* Auto-compaction snapshot API

* fix when not all compacted segments are iterated

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add unit tests

* add some tests to make code cov happy

* address comments

* address comments

* address comments

* address comments

* make code coverage happy

* address comments

* address comments

* address comments

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-09-18 16:37:58 -07:00 committed by GitHub
parent d0ee2e3a48
commit e78d7862a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1367 additions and 61 deletions

View File

@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import javax.validation.constraints.NotNull;
import java.util.Objects;
public class AutoCompactionSnapshot
{
public enum AutoCompactionScheduleStatus
{
NOT_ENABLED,
RUNNING
}
@JsonProperty
private String dataSource;
@JsonProperty
private AutoCompactionScheduleStatus scheduleStatus;
@JsonProperty
private long bytesAwaitingCompaction;
@JsonProperty
private long bytesCompacted;
@JsonProperty
private long bytesSkipped;
@JsonProperty
private long segmentCountAwaitingCompaction;
@JsonProperty
private long segmentCountCompacted;
@JsonProperty
private long segmentCountSkipped;
@JsonProperty
private long intervalCountAwaitingCompaction;
@JsonProperty
private long intervalCountCompacted;
@JsonProperty
private long intervalCountSkipped;
@JsonCreator
public AutoCompactionSnapshot(
@JsonProperty @NotNull String dataSource,
@JsonProperty @NotNull AutoCompactionScheduleStatus scheduleStatus,
@JsonProperty long bytesAwaitingCompaction,
@JsonProperty long bytesCompacted,
@JsonProperty long bytesSkipped,
@JsonProperty long segmentCountAwaitingCompaction,
@JsonProperty long segmentCountCompacted,
@JsonProperty long segmentCountSkipped,
@JsonProperty long intervalCountAwaitingCompaction,
@JsonProperty long intervalCountCompacted,
@JsonProperty long intervalCountSkipped
)
{
this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.bytesAwaitingCompaction = bytesAwaitingCompaction;
this.bytesCompacted = bytesCompacted;
this.bytesSkipped = bytesSkipped;
this.segmentCountAwaitingCompaction = segmentCountAwaitingCompaction;
this.segmentCountCompacted = segmentCountCompacted;
this.segmentCountSkipped = segmentCountSkipped;
this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction;
this.intervalCountCompacted = intervalCountCompacted;
this.intervalCountSkipped = intervalCountSkipped;
}
@NotNull
public String getDataSource()
{
return dataSource;
}
@NotNull
public AutoCompactionScheduleStatus getScheduleStatus()
{
return scheduleStatus;
}
public long getBytesAwaitingCompaction()
{
return bytesAwaitingCompaction;
}
public long getBytesCompacted()
{
return bytesCompacted;
}
public long getBytesSkipped()
{
return bytesSkipped;
}
public long getSegmentCountAwaitingCompaction()
{
return segmentCountAwaitingCompaction;
}
public long getSegmentCountCompacted()
{
return segmentCountCompacted;
}
public long getSegmentCountSkipped()
{
return segmentCountSkipped;
}
public long getIntervalCountAwaitingCompaction()
{
return intervalCountAwaitingCompaction;
}
public long getIntervalCountCompacted()
{
return intervalCountCompacted;
}
public long getIntervalCountSkipped()
{
return intervalCountSkipped;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AutoCompactionSnapshot that = (AutoCompactionSnapshot) o;
return bytesAwaitingCompaction == that.bytesAwaitingCompaction &&
bytesCompacted == that.bytesCompacted &&
bytesSkipped == that.bytesSkipped &&
segmentCountAwaitingCompaction == that.segmentCountAwaitingCompaction &&
segmentCountCompacted == that.segmentCountCompacted &&
segmentCountSkipped == that.segmentCountSkipped &&
intervalCountAwaitingCompaction == that.intervalCountAwaitingCompaction &&
intervalCountCompacted == that.intervalCountCompacted &&
intervalCountSkipped == that.intervalCountSkipped &&
dataSource.equals(that.dataSource) &&
scheduleStatus == that.scheduleStatus;
}
@Override
public int hashCode()
{
return Objects.hash(
dataSource,
scheduleStatus,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
segmentCountAwaitingCompaction,
segmentCountCompacted,
segmentCountSkipped,
intervalCountAwaitingCompaction,
intervalCountCompacted,
intervalCountSkipped
);
}
public static class Builder
{
private String dataSource;
private AutoCompactionScheduleStatus scheduleStatus;
private long bytesAwaitingCompaction;
private long bytesCompacted;
private long bytesSkipped;
private long segmentCountAwaitingCompaction;
private long segmentCountCompacted;
private long segmentCountSkipped;
private long intervalCountAwaitingCompaction;
private long intervalCountCompacted;
private long intervalCountSkipped;
public Builder(
@NotNull String dataSource,
@NotNull AutoCompactionScheduleStatus scheduleStatus
)
{
this.dataSource = dataSource;
this.scheduleStatus = scheduleStatus;
this.bytesAwaitingCompaction = 0;
this.bytesCompacted = 0;
this.bytesSkipped = 0;
this.segmentCountAwaitingCompaction = 0;
this.segmentCountCompacted = 0;
this.segmentCountSkipped = 0;
this.intervalCountAwaitingCompaction = 0;
this.intervalCountCompacted = 0;
this.intervalCountSkipped = 0;
}
public Builder incrementBytesAwaitingCompaction(long incrementValue)
{
this.bytesAwaitingCompaction = this.bytesAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementBytesCompacted(long incrementValue)
{
this.bytesCompacted = this.bytesCompacted + incrementValue;
return this;
}
public Builder incrementSegmentCountAwaitingCompaction(long incrementValue)
{
this.segmentCountAwaitingCompaction = this.segmentCountAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementSegmentCountCompacted(long incrementValue)
{
this.segmentCountCompacted = this.segmentCountCompacted + incrementValue;
return this;
}
public Builder incrementIntervalCountAwaitingCompaction(long incrementValue)
{
this.intervalCountAwaitingCompaction = this.intervalCountAwaitingCompaction + incrementValue;
return this;
}
public Builder incrementIntervalCountCompacted(long incrementValue)
{
this.intervalCountCompacted = this.intervalCountCompacted + incrementValue;
return this;
}
public Builder incrementBytesSkipped(long incrementValue)
{
this.bytesSkipped = this.bytesSkipped + incrementValue;
return this;
}
public Builder incrementSegmentCountSkipped(long incrementValue)
{
this.segmentCountSkipped = this.segmentCountSkipped + incrementValue;
return this;
}
public Builder incrementIntervalCountSkipped(long incrementValue)
{
this.intervalCountSkipped = this.intervalCountSkipped + incrementValue;
return this;
}
public AutoCompactionSnapshot build()
{
if (dataSource == null || dataSource.isEmpty()) {
throw new ISE("Invalid dataSource name");
}
if (scheduleStatus == null) {
throw new ISE("scheduleStatus cannot be null");
}
return new AutoCompactionSnapshot(
dataSource,
scheduleStatus,
bytesAwaitingCompaction,
bytesCompacted,
bytesSkipped,
segmentCountAwaitingCompaction,
segmentCountCompacted,
segmentCountSkipped,
intervalCountAwaitingCompaction,
intervalCountCompacted,
intervalCountSkipped
);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
public class CompactionStatistics
{
private long byteSum;
private long segmentNumberCountSum;
private long segmentIntervalCountSum;
public CompactionStatistics(
long byteSum,
long segmentNumberCountSum,
long segmentIntervalCountSum
)
{
this.byteSum = byteSum;
this.segmentNumberCountSum = segmentNumberCountSum;
this.segmentIntervalCountSum = segmentIntervalCountSum;
}
public static CompactionStatistics initializeCompactionStatistics()
{
return new CompactionStatistics(0, 0, 0);
}
public long getByteSum()
{
return byteSum;
}
public long getSegmentNumberCountSum()
{
return segmentNumberCountSum;
}
public long getSegmentIntervalCountSum()
{
return segmentIntervalCountSum;
}
public void incrementCompactedByte(long incrementValue)
{
byteSum = byteSum + incrementValue;
}
public void incrementCompactedSegments(long incrementValue)
{
segmentNumberCountSum = segmentNumberCountSum + incrementValue;
}
public void incrementCompactedIntervals(long incrementValue)
{
segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
}
}

View File

@ -367,6 +367,17 @@ public class DruidCoordinator
return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource); return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource);
} }
@Nullable
public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource)
{
return compactSegments.getAutoCompactionSnapshot(dataSource);
}
public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
{
return compactSegments.getAutoCompactionSnapshot();
}
public CoordinatorDynamicConfig getDynamicConfigs() public CoordinatorDynamicConfig getDynamicConfigs()
{ {
return CoordinatorDynamicConfig.current(configManager); return CoordinatorDynamicConfig.current(configManager);
@ -589,7 +600,11 @@ public class DruidCoordinator
} }
for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) { for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
ScheduledExecutors.scheduleWithFixedDelay( // CompactSegmentsDuty can takes a non trival amount of time to complete.
// Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every
// config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken
// if setting config.getCoordinatorIndexingPeriod() lower than the default value.
ScheduledExecutors.scheduleAtFixedRate(
exec, exec,
config.getCoordinatorStartDelay(), config.getCoordinatorStartDelay(),
dutiesRunnable.rhs, dutiesRunnable.rhs,
@ -657,8 +672,9 @@ public class DruidCoordinator
{ {
List<CoordinatorDuty> duties = new ArrayList<>(); List<CoordinatorDuty> duties = new ArrayList<>();
duties.add(new LogUsedSegments()); duties.add(new LogUsedSegments());
duties.addAll(makeCompactSegmentsDuty());
duties.addAll(indexingServiceDuties); duties.addAll(indexingServiceDuties);
// CompactSegmentsDuty should be the last duty as it can take a long time to complete
duties.addAll(makeCompactSegmentsDuty());
log.debug( log.debug(
"Done making indexing service duties %s", "Done making indexing service duties %s",

View File

@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient;
@ -30,6 +29,8 @@ import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -43,13 +44,28 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class CompactSegments implements CoordinatorDuty public class CompactSegments implements CoordinatorDuty
{ {
static final String COMPACTION_TASK_COUNT = "compactTaskCount"; static final String COMPACTION_TASK_COUNT = "compactTaskCount";
static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION = "segmentSizeWaitCompact"; static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
static final String MAX_COMPACTION_TASK_SLOT = "maxCompactionTaskSlot";
static final String TOTAL_SIZE_OF_SEGMENTS_SKIPPED = "segmentSizeSkippedCompact";
static final String TOTAL_COUNT_OF_SEGMENTS_SKIPPED = "segmentCountSkippedCompact";
static final String TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED = "segmentIntervalSkippedCompact";
static final String TOTAL_SIZE_OF_SEGMENTS_AWAITING = "segmentSizeWaitCompact";
static final String TOTAL_COUNT_OF_SEGMENTS_AWAITING = "segmentCountWaitCompact";
static final String TOTAL_INTERVAL_OF_SEGMENTS_AWAITING = "segmentIntervalWaitCompact";
static final String TOTAL_SIZE_OF_SEGMENTS_COMPACTED = "segmentSizeCompacted";
static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted";
static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted";
/** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */ /** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */
public static final String COMPACTION_TASK_TYPE = "compact"; public static final String COMPACTION_TASK_TYPE = "compact";
@ -61,7 +77,9 @@ public class CompactSegments implements CoordinatorDuty
private final CompactionSegmentSearchPolicy policy; private final CompactionSegmentSearchPolicy policy;
private final IndexingServiceClient indexingServiceClient; private final IndexingServiceClient indexingServiceClient;
private Object2LongOpenHashMap<String> totalSizesOfSegmentsAwaitingCompactionPerDataSource; // This variable is updated by the Coordinator thread executing duties and
// read by HTTP threads processing Coordinator API calls.
private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
@Inject @Inject
public CompactSegments( public CompactSegments(
@ -71,6 +89,7 @@ public class CompactSegments implements CoordinatorDuty
{ {
this.policy = new NewestSegmentFirstPolicy(objectMapper); this.policy = new NewestSegmentFirstPolicy(objectMapper);
this.indexingServiceClient = indexingServiceClient; this.indexingServiceClient = indexingServiceClient;
autoCompactionSnapshotPerDataSource.set(new HashMap<>());
} }
@Override @Override
@ -80,12 +99,12 @@ public class CompactSegments implements CoordinatorDuty
final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig(); final CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
final CoordinatorStats stats = new CoordinatorStats(); final CoordinatorStats stats = new CoordinatorStats();
final Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders = new HashMap<>();
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
updateAutoCompactionSnapshot(compactionConfigList, currentRunAutoCompactionSnapshotBuilders);
if (dynamicConfig.getMaxCompactionTaskSlots() > 0) { if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources =
params.getUsedSegmentsTimelinesPerDataSource(); params.getUsedSegmentsTimelinesPerDataSource();
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
if (compactionConfigList != null && !compactionConfigList.isEmpty()) { if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream() .stream()
@ -136,16 +155,23 @@ public class CompactSegments implements CoordinatorDuty
numAvailableCompactionTaskSlots, numAvailableCompactionTaskSlots,
compactionTaskCapacity compactionTaskCapacity
); );
stats.addToGlobalStat(AVAILABLE_COMPACTION_TASK_SLOT, numAvailableCompactionTaskSlots);
stats.addToGlobalStat(MAX_COMPACTION_TASK_SLOT, compactionTaskCapacity);
if (numAvailableCompactionTaskSlots > 0) { if (numAvailableCompactionTaskSlots > 0) {
stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator)); stats.accumulate(
doRun(compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, iterator)
);
} else { } else {
stats.accumulate(makeStats(0, iterator)); stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
} }
} else { } else {
LOG.info("compactionConfig is empty. Skip."); LOG.info("compactionConfig is empty. Skip.");
updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
} }
} else { } else {
LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction"); LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction");
updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(currentRunAutoCompactionSnapshotBuilders);
} }
return params.buildFromExisting() return params.buildFromExisting()
@ -172,6 +198,33 @@ public class CompactSegments implements CoordinatorDuty
} }
} }
private void updateAutoCompactionSnapshot(
List<DataSourceCompactionConfig> compactionConfigList,
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders)
{
Set<String> enabledDatasources = compactionConfigList.stream()
.map(dataSourceCompactionConfig -> dataSourceCompactionConfig.getDataSource())
.collect(Collectors.toSet());
// Update AutoCompactionScheduleStatus for dataSource that now has auto compaction disabled
for (Map.Entry<String, AutoCompactionSnapshot> snapshot : autoCompactionSnapshotPerDataSource.get().entrySet()) {
if (!enabledDatasources.contains(snapshot.getKey())) {
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
snapshot.getKey(),
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
);
}
}
// Create and Update snapshot for dataSource that has auto compaction enabled
for (String compactionConfigDataSource : enabledDatasources) {
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
compactionConfigDataSource,
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
}
}
private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses) private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
{ {
return taskStatuses return taskStatuses
@ -189,6 +242,7 @@ public class CompactSegments implements CoordinatorDuty
private CoordinatorStats doRun( private CoordinatorStats doRun(
Map<String, DataSourceCompactionConfig> compactionConfigs, Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numAvailableCompactionTaskSlots, int numAvailableCompactionTaskSlots,
CompactionSegmentIterator iterator CompactionSegmentIterator iterator
) )
@ -200,6 +254,12 @@ public class CompactSegments implements CoordinatorDuty
if (!segmentsToCompact.isEmpty()) { if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource(); final String dataSourceName = segmentsToCompact.get(0).getDataSource();
// As these segments will be compacted, we will aggregates the statistic to the Compacted statistics
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// make tuningConfig // make tuningConfig
final String taskId = indexingServiceClient.compactSegments( final String taskId = indexingServiceClient.compactSegments(
@ -209,6 +269,7 @@ public class CompactSegments implements CoordinatorDuty
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
newAutoCompactionContext(config.getTaskContext()) newAutoCompactionContext(config.getTaskContext())
); );
LOG.info( LOG.info(
"Submitted a compactionTask[%s] for %s segments", "Submitted a compactionTask[%s] for %s segments",
taskId, taskId,
@ -222,7 +283,7 @@ public class CompactSegments implements CoordinatorDuty
} }
} }
return makeStats(numSubmittedTasks, iterator); return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator);
} }
private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext) private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext)
@ -234,29 +295,174 @@ public class CompactSegments implements CoordinatorDuty
return newContext; return newContext;
} }
private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator) /**
* This method can be use to atomically update the snapshots in {@code autoCompactionSnapshotPerDataSource} when
* no compaction task is schedule in this run. Currently, this method does not update compaction statistics
* (bytes, interval count, segment count, etc) since we skip iterating through the segments and cannot get an update
* on those statistics. Thus, this method only updates the schedule status and task list (compaction statistics
* remains the same as the previous snapshot).
*/
private void updateAutoCompactionSnapshotWhenNoCompactTaskScheduled(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders
)
{ {
Map<String, AutoCompactionSnapshot> previousSnapshots = autoCompactionSnapshotPerDataSource.get();
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
AutoCompactionSnapshot previousSnapshot = previousSnapshots.get(dataSource);
if (previousSnapshot != null) {
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesAwaitingCompaction(previousSnapshot.getBytesAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesCompacted(previousSnapshot.getBytesCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementBytesSkipped(previousSnapshot.getBytesSkipped());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountAwaitingCompaction(previousSnapshot.getSegmentCountAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountCompacted(previousSnapshot.getSegmentCountCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementSegmentCountSkipped(previousSnapshot.getSegmentCountSkipped());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountAwaitingCompaction(previousSnapshot.getIntervalCountAwaitingCompaction());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountCompacted(previousSnapshot.getIntervalCountCompacted());
autoCompactionSnapshotBuilderEntry.getValue().incrementIntervalCountSkipped(previousSnapshot.getIntervalCountSkipped());
}
}
Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = Maps.transformValues(
currentRunAutoCompactionSnapshotBuilders,
AutoCompactionSnapshot.Builder::build
);
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
}
private CoordinatorStats makeStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numCompactionTasks,
CompactionSegmentIterator iterator
)
{
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
final CoordinatorStats stats = new CoordinatorStats(); final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks); stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);
totalSizesOfSegmentsAwaitingCompactionPerDataSource = iterator.totalRemainingSegmentsSizeBytes();
totalSizesOfSegmentsAwaitingCompactionPerDataSource.object2LongEntrySet().fastForEach( // Iterate through all the remaining segments in the iterator.
entry -> { // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
final String dataSource = entry.getKey(); // the statistic to the AwaitingCompaction statistics
final long totalSizeOfSegmentsAwaitingCompaction = entry.getLongValue(); while (iterator.hasNext()) {
stats.addToDataSourceStat( final List<DataSegment> segmentsToCompact = iterator.next();
TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION, if (!segmentsToCompact.isEmpty()) {
dataSource, final String dataSourceName = segmentsToCompact.get(0).getDataSource();
totalSizeOfSegmentsAwaitingCompaction AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.get(dataSourceName);
); snapshotBuilder.incrementBytesAwaitingCompaction(
} segmentsToCompact.stream()
); .mapToLong(DataSegment::getSize)
.sum()
);
snapshotBuilder.incrementIntervalCountAwaitingCompaction(
segmentsToCompact.stream()
.map(DataSegment::getInterval)
.distinct()
.count()
);
snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
}
}
// Statistics of all segments considered compacted after this run
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
// Statistics of all segments considered skipped after this run
Map<String, CompactionStatistics> allSkippedStatistics = iterator.totalSkippedStatistics();
for (Map.Entry<String, AutoCompactionSnapshot.Builder> autoCompactionSnapshotBuilderEntry : currentRunAutoCompactionSnapshotBuilders.entrySet()) {
final String dataSource = autoCompactionSnapshotBuilderEntry.getKey();
final AutoCompactionSnapshot.Builder builder = autoCompactionSnapshotBuilderEntry.getValue();
CompactionStatistics dataSourceCompactedStatistics = allCompactedStatistics.get(dataSource);
CompactionStatistics dataSourceSkippedStatistics = allSkippedStatistics.get(dataSource);
if (dataSourceCompactedStatistics != null) {
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
}
if (dataSourceSkippedStatistics != null) {
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
}
// Build the complete snapshot for the datasource
AutoCompactionSnapshot autoCompactionSnapshot = builder.build();
currentAutoCompactionSnapshotPerDataSource.put(dataSource, autoCompactionSnapshot);
// Use the complete snapshot to emits metrics
stats.addToDataSourceStat(
TOTAL_SIZE_OF_SEGMENTS_AWAITING,
dataSource,
autoCompactionSnapshot.getBytesAwaitingCompaction()
);
stats.addToDataSourceStat(
TOTAL_COUNT_OF_SEGMENTS_AWAITING,
dataSource,
autoCompactionSnapshot.getSegmentCountAwaitingCompaction()
);
stats.addToDataSourceStat(
TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
dataSource,
autoCompactionSnapshot.getIntervalCountAwaitingCompaction()
);
stats.addToDataSourceStat(
TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
dataSource,
autoCompactionSnapshot.getBytesCompacted()
);
stats.addToDataSourceStat(
TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
dataSource,
autoCompactionSnapshot.getSegmentCountCompacted()
);
stats.addToDataSourceStat(
TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
dataSource,
autoCompactionSnapshot.getIntervalCountCompacted()
);
stats.addToDataSourceStat(
TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
dataSource,
autoCompactionSnapshot.getBytesSkipped()
);
stats.addToDataSourceStat(
TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
dataSource,
autoCompactionSnapshot.getSegmentCountSkipped()
);
stats.addToDataSourceStat(
TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
dataSource,
autoCompactionSnapshot.getIntervalCountSkipped()
);
}
// Atomic update of autoCompactionSnapshotPerDataSource with the latest from this coordinator run
autoCompactionSnapshotPerDataSource.set(currentAutoCompactionSnapshotPerDataSource);
return stats; return stats;
} }
@SuppressWarnings("deprecation") // Intentionally using boxing get() to return null if dataSource is unknown
@Nullable @Nullable
public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource)
{ {
return totalSizesOfSegmentsAwaitingCompactionPerDataSource.get(dataSource); AutoCompactionSnapshot autoCompactionSnapshot = autoCompactionSnapshotPerDataSource.get().get(dataSource);
if (autoCompactionSnapshot == null) {
return null;
}
return autoCompactionSnapshot.getBytesAwaitingCompaction();
}
@Nullable
public AutoCompactionSnapshot getAutoCompactionSnapshot(String dataSource)
{
return autoCompactionSnapshotPerDataSource.get().get(dataSource);
}
public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot()
{
return autoCompactionSnapshotPerDataSource.get();
} }
} }

View File

@ -19,11 +19,12 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Segments in the lists which are the elements of this iterator are sorted according to the natural segment order * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
@ -31,10 +32,20 @@ import java.util.List;
*/ */
public interface CompactionSegmentIterator extends Iterator<List<DataSegment>> public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
{ {
long UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE = -1L;
/** /**
* Return a map of (dataSource, total size of remaining segments) for all dataSources. * Return a map of dataSourceName to CompactionStatistics.
* This method should consider all segments except the segments returned by {@link #next()}. * This method returns the aggregated statistics of segments that was already compacted and does not need to be compacted
* again. Hence, segment that were not returned by the {@link Iterator#next()} becuase it does not needs compaction.
* Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
*/ */
Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes(); Map<String, CompactionStatistics> totalCompactedStatistics();
/**
* Return a map of dataSourceName to CompactionStatistics.
* This method returns the aggregated statistics of segments that was skipped as it cannot be compacted.
* Hence, segment that were not returned by the {@link Iterator#next()} becuase it cannot be compacted.
* Note that the aggregations returned by this method is only up to the current point of the iterator being iterated.
*/
Map<String, CompactionStatistics> totalSkippedStatistics();
} }

View File

@ -301,8 +301,33 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
) )
); );
emitter.emit(
new ServiceMetricEvent.Builder().build(
"compactTask/maxSlot/count",
stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT)
)
);
emitter.emit(
new ServiceMetricEvent.Builder().build(
"compactTask/availableSlot/count",
stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT)
)
);
stats.forEachDataSourceStat( stats.forEachDataSourceStat(
"segmentsWaitCompact", CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/waitCompact/bytes", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_COUNT_OF_SEGMENTS_AWAITING,
(final String dataSource, final long count) -> { (final String dataSource, final long count) -> {
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
@ -312,6 +337,83 @@ public class EmitClusterStatsAndMetrics implements CoordinatorDuty
} }
); );
stats.forEachDataSourceStat(
CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_AWAITING,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/waitCompact/count", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_SIZE_OF_SEGMENTS_SKIPPED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/bytes", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_COUNT_OF_SEGMENTS_SKIPPED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/skipCompact/count", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_SKIPPED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/skipCompact/count", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_SIZE_OF_SEGMENTS_COMPACTED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/bytes", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_COUNT_OF_SEGMENTS_COMPACTED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("segment/compacted/count", count)
);
}
);
stats.forEachDataSourceStat(
CompactSegments.TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED,
(final String dataSource, final long count) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.build("interval/compacted/count", count)
);
}
);
// Emit segment metrics // Emit segment metrics
params.getUsedSegmentsTimelinesPerDataSource().forEach( params.getUsedSegmentsTimelinesPerDataSource().forEach(
(String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> { (String dataSource, VersionedIntervalTimeline<String, DataSegment> dataSourceWithUsedSegments) -> {

View File

@ -22,9 +22,7 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
@ -35,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
@ -51,6 +50,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -68,7 +68,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs; private final Map<String, DataSourceCompactionConfig> compactionConfigs;
private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources; private final Map<String, CompactionStatistics> compactedSegments = new HashMap<>();
private final Map<String, CompactionStatistics> skippedSegments = new HashMap<>();
// dataSource -> intervalToFind // dataSource -> intervalToFind
// searchIntervals keeps track of the current state of which interval should be considered to search segments to // searchIntervals keeps track of the current state of which interval should be considered to search segments to
@ -88,7 +89,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{ {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.compactionConfigs = compactionConfigs; this.compactionConfigs = compactionConfigs;
this.dataSources = dataSources;
this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size()); this.timelineIterators = Maps.newHashMapWithExpectedSize(dataSources.size());
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> { dataSources.forEach((String dataSource, VersionedIntervalTimeline<String, DataSegment> timeline) -> {
@ -112,27 +112,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} }
@Override @Override
public Object2LongOpenHashMap<String> totalRemainingSegmentsSizeBytes() public Map<String, CompactionStatistics> totalCompactedStatistics()
{ {
final Object2LongOpenHashMap<String> resultMap = new Object2LongOpenHashMap<>(); return compactedSegments;
resultMap.defaultReturnValue(UNKNOWN_TOTAL_REMAINING_SEGMENTS_SIZE); }
for (QueueEntry entry : queue) {
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(entry.getDataSource());
final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd());
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval); @Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
long size = 0; {
for (DataSegment segment : FluentIterable return skippedSegments;
.from(holders)
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)) {
size += segment.getSize();
}
resultMap.put(entry.getDataSource(), size);
}
return resultMap;
} }
@Override @Override
@ -159,6 +147,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty"); Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");
final String dataSource = resultSegments.get(0).getDataSource(); final String dataSource = resultSegments.get(0).getDataSource();
updateQueue(dataSource, compactionConfigs.get(dataSource)); updateQueue(dataSource, compactionConfigs.get(dataSource));
return resultSegments; return resultSegments;
@ -181,6 +170,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
} }
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact( final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
dataSourceName,
compactibleTimelineObjectHolderCursor, compactibleTimelineObjectHolderCursor,
config config
); );
@ -336,6 +326,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
* @return segments to compact * @return segments to compact
*/ */
private SegmentsToCompact findSegmentsToCompact( private SegmentsToCompact findSegmentsToCompact(
final String dataSourceName,
final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor, final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config final DataSourceCompactionConfig config
) )
@ -355,7 +346,13 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (isCompactibleSize && needsCompaction) { if (isCompactibleSize && needsCompaction) {
return candidates; return candidates;
} else { } else {
if (!isCompactibleSize) { if (!needsCompaction) {
// Collect statistic for segments that is already compacted
collectSegmentStatistics(compactedSegments, dataSourceName, candidates);
} else {
// Collect statistic for segments that is skipped
// Note that if segments does not need compaction then we do not double count here
collectSegmentStatistics(skippedSegments, dataSourceName, candidates);
log.warn( log.warn(
"total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]." "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next interval.", + " Continue to the next interval.",
@ -374,6 +371,20 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return new SegmentsToCompact(); return new SegmentsToCompact();
} }
private void collectSegmentStatistics(
Map<String, CompactionStatistics> statisticsMap,
String dataSourceName,
SegmentsToCompact segments)
{
CompactionStatistics statistics = statisticsMap.computeIfAbsent(
dataSourceName,
v -> CompactionStatistics.initializeCompactionStatistics()
);
statistics.incrementCompactedByte(segments.getTotalSize());
statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
statistics.incrementCompactedSegments(segments.getNumberOfSegments());
}
/** /**
* Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}. * Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
* *
@ -563,6 +574,16 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return totalSize; return totalSize;
} }
private long getNumberOfSegments()
{
return segments.size();
}
private long getNumberOfIntervals()
{
return segments.stream().map(DataSegment::getInterval).distinct().count();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -20,9 +20,11 @@
package org.apache.druid.server.http; package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters; import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.server.http.security.StateResourceFilter;
@ -34,6 +36,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.Collection;
@Path("/druid/coordinator/v1/compaction") @Path("/druid/coordinator/v1/compaction")
public class CompactionResource public class CompactionResource
@ -76,4 +79,25 @@ public class CompactionResource
return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build(); return Response.ok(ImmutableMap.of("remainingSegmentSize", notCompactedSegmentSizeBytes)).build();
} }
} }
@GET
@Path("/status")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getCompactionSnapshotForDataSource(
@QueryParam("dataSource") String dataSource
)
{
final Collection<AutoCompactionSnapshot> snapshots;
if (dataSource == null || dataSource.isEmpty()) {
snapshots = coordinator.getAutoCompactionSnapshot().values();
} else {
AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (autoCompactionSnapshot == null) {
return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "unknown dataSource")).build();
}
snapshots = ImmutableList.of(autoCompactionSnapshot);
}
return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
}
} }

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import org.junit.Assert;
import org.junit.Test;
public class AutoCompactionSnapshotTest
{
@Test
public void testAutoCompactionSnapshotBuilder()
{
final String expectedDataSource = "data";
final AutoCompactionSnapshot.AutoCompactionScheduleStatus expectedStatus = AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING;
AutoCompactionSnapshot.Builder builder = new AutoCompactionSnapshot.Builder(expectedDataSource, expectedStatus);
// Increment every stats twice
for (int i = 0; i < 2; i++) {
builder.incrementIntervalCountSkipped(13);
builder.incrementBytesSkipped(13);
builder.incrementSegmentCountSkipped(13);
builder.incrementIntervalCountCompacted(13);
builder.incrementBytesCompacted(13);
builder.incrementSegmentCountCompacted(13);
builder.incrementIntervalCountAwaitingCompaction(13);
builder.incrementBytesAwaitingCompaction(13);
builder.incrementSegmentCountAwaitingCompaction(13);
}
AutoCompactionSnapshot actual = builder.build();
Assert.assertNotNull(actual);
Assert.assertEquals(26, actual.getSegmentCountSkipped());
Assert.assertEquals(26, actual.getIntervalCountSkipped());
Assert.assertEquals(26, actual.getBytesSkipped());
Assert.assertEquals(26, actual.getBytesCompacted());
Assert.assertEquals(26, actual.getIntervalCountCompacted());
Assert.assertEquals(26, actual.getSegmentCountCompacted());
Assert.assertEquals(26, actual.getBytesAwaitingCompaction());
Assert.assertEquals(26, actual.getIntervalCountAwaitingCompaction());
Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction());
Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus());
Assert.assertEquals(expectedDataSource, actual.getDataSource());
AutoCompactionSnapshot expected = new AutoCompactionSnapshot(
expectedDataSource,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
26,
26,
26,
26,
26,
26,
26,
26,
26
);
Assert.assertEquals(expected, actual);
}
}

View File

@ -49,6 +49,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.CoordinatorStats;
@ -98,6 +99,10 @@ public class CompactSegmentsTest
{ {
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DATA_SOURCE_PREFIX = "dataSource_"; private static final String DATA_SOURCE_PREFIX = "dataSource_";
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() public static Collection<Object[]> constructorFeeder()
@ -105,7 +110,7 @@ public class CompactSegmentsTest
final MutableInt nextRangePartitionBoundary = new MutableInt(0); final MutableInt nextRangePartitionBoundary = new MutableInt(0);
return ImmutableList.of( return ImmutableList.of(
new Object[]{ new Object[]{
new DynamicPartitionsSpec(300000, null), new DynamicPartitionsSpec(300000, Long.MAX_VALUE),
(BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new (BiFunction<Integer, Integer, ShardSpec>) NumberedShardSpec::new
}, },
new Object[]{ new Object[]{
@ -270,12 +275,363 @@ public class CompactSegmentsTest
assertLastSegmentNotCompacted(compactSegments); assertLastSegmentNotCompacted(compactSegments);
} }
@Test
public void testMakeStats()
{
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
for (int compaction_run_count = 0; compaction_run_count < 11; compaction_run_count++) {
assertCompactSegmentStatistics(compactSegments, compaction_run_count);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
0
);
}
// Run auto compaction without any dataSource in the compaction config
// Should still populate the result of everything fully compacted
doCompactSegments(compactSegments, new ArrayList<>());
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
for (int i = 0; i < 3; i++) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED,
DATA_SOURCE_PREFIX + i,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
TOTAL_SEGMENT_PER_DATASOURCE / 2,
0
);
}
assertLastSegmentNotCompacted(compactSegments);
}
@Test
public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIntervals()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals already compacted (3 intervals, 120 byte, 12 segments already compacted)
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day compacted (two compacted intervals back-to-back)
beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
}
if (j == 1) {
// Make one interval on this day compacted
afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of()));
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot
.fromUsedSegments(segments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 120 byte, 12 segments already compacted before the run
for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
120 + 40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
3 + (compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
// 12 segments was compressed before any auto compaction
// 4 segments was compressed in this run of auto compaction
// Each previous auto compaction run resulted in 2 compacted segments (4 segments compacted into 2 segments)
12 + 4 + 2 * (compaction_run_count),
0
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
TOTAL_BYTE_PER_DATASOURCE,
0,
0,
TOTAL_INTERVAL_PER_DATASOURCE,
0,
0,
// 12 segments was compressed before any auto compaction
// 32 segments needs compaction which is now compacted into 16 segments (4 segments compacted into 2 segments each run)
12 + 16,
0
);
}
@Test
public void testMakeStatsForDataSourceWithSkipped()
{
// Only test and validate for one datasource for simplicity.
// This dataSource has three intervals skipped (3 intervals, 1200 byte, 12 segments skipped by auto compaction)
// Note that these segment used to be 10 bytes each in other tests, we are increasing it to 100 bytes each here
// so that they will be skipped by the auto compaction.
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
for (int k = 0; k < 4; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day skipped (two skipped intervals back-to-back)
beforeNoon = beforeNoon.withSize(100);
afterNoon = afterNoon.withSize(100);
}
if (j == 1) {
// Make one interval on this day skipped
afterNoon = afterNoon.withSize(100);
}
segments.add(beforeNoon);
segments.add(afterNoon);
}
}
dataSources = DataSourcesSnapshot
.fromUsedSegments(segments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
Assert.assertEquals(0, autoCompactionSnapshots.size());
// 3 intervals, 1200 byte (each segment is 100 bytes), 12 segments will be skipped by auto compaction
for (int compaction_run_count = 0; compaction_run_count < 8; compaction_run_count++) {
// Do a cycle of auto compaction which creates one compaction task
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120 - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
1200,
TOTAL_INTERVAL_PER_DATASOURCE - 3 - (compaction_run_count + 1),
(compaction_run_count + 1),
3,
TOTAL_SEGMENT_PER_DATASOURCE - 12 - 4 * (compaction_run_count + 1),
4 + 2 * (compaction_run_count),
12
);
}
// Test that stats does not change (and is still correct) when auto compaction runs with everything is fully compacted
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
0,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
dataSourceName,
0,
// Minus 120 bytes accounting for the three skipped segments' original size
TOTAL_BYTE_PER_DATASOURCE - 120,
1200,
0,
TOTAL_INTERVAL_PER_DATASOURCE - 3,
3,
0,
16,
12
);
}
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
String dataSourceName,
long expectedByteCountAwaitingCompaction,
long expectedByteCountCompressed,
long expectedByteCountSkipped,
long expectedIntervalCountAwaitingCompaction,
long expectedIntervalCountCompressed,
long expectedIntervalCountSkipped,
long expectedSegmentCountAwaitingCompaction,
long expectedSegmentCountCompressed,
long expectedSegmentCountSkipped
)
{
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
AutoCompactionSnapshot snapshot = autoCompactionSnapshots.get(dataSourceName);
Assert.assertEquals(dataSourceName, snapshot.getDataSource());
Assert.assertEquals(scheduleStatus, snapshot.getScheduleStatus());
Assert.assertEquals(expectedByteCountAwaitingCompaction, snapshot.getBytesAwaitingCompaction());
Assert.assertEquals(expectedByteCountCompressed, snapshot.getBytesCompacted());
Assert.assertEquals(expectedByteCountSkipped, snapshot.getBytesSkipped());
Assert.assertEquals(expectedIntervalCountAwaitingCompaction, snapshot.getIntervalCountAwaitingCompaction());
Assert.assertEquals(expectedIntervalCountCompressed, snapshot.getIntervalCountCompacted());
Assert.assertEquals(expectedIntervalCountSkipped, snapshot.getIntervalCountSkipped());
Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction());
Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted());
Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped());
}
private void assertCompactSegmentStatistics(CompactSegments compactSegments, int compaction_run_count)
{
for (int dataSourceIndex = 0; dataSourceIndex < 3; dataSourceIndex++) {
// One compaction task triggered
final CoordinatorStats stats = doCompactSegments(compactSegments);
Assert.assertEquals(
1,
stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT)
);
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
// Note: Subsequent compaction run after the dataSource was compacted will show different numbers than
// on the run it was compacted. For example, in a compaction run, if a dataSource had 4 segments compacted,
// on the same compaction run the segment compressed count will be 4 but on subsequent run it might be 2
// (assuming the 4 segments was compacted into 2 segments).
for (int i = 0; i <= dataSourceIndex; i++) {
// dataSource up to dataSourceIndex now compacted. Check that the stats match the expectedAfterCompaction values
// This verify that dataSource which got slot to compact has correct statistics
if (i != dataSourceIndex) {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
(compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
2 * (compaction_run_count + 1),
0
);
} else {
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * (compaction_run_count + 1),
40 * (compaction_run_count + 1),
0,
TOTAL_INTERVAL_PER_DATASOURCE - (compaction_run_count + 1),
(compaction_run_count + 1),
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * (compaction_run_count + 1),
2 * compaction_run_count + 4,
0
);
}
}
for (int i = dataSourceIndex + 1; i < 3; i++) {
// dataSource after dataSourceIndex is not yet compacted. Check that the stats match the expectedBeforeCompaction values
// This verify that dataSource that ran out of slot has correct statistics
verifySnapshot(
compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
DATA_SOURCE_PREFIX + i,
TOTAL_BYTE_PER_DATASOURCE - 40 * compaction_run_count,
40 * compaction_run_count,
0,
TOTAL_INTERVAL_PER_DATASOURCE - compaction_run_count,
compaction_run_count,
0,
TOTAL_SEGMENT_PER_DATASOURCE - 4 * compaction_run_count,
2 * compaction_run_count,
0
);
}
}
}
private CoordinatorStats doCompactSegments(CompactSegments compactSegments) private CoordinatorStats doCompactSegments(CompactSegments compactSegments)
{
return doCompactSegments(compactSegments, createCompactionConfigs());
}
private CoordinatorStats doCompactSegments(CompactSegments compactSegments, List<DataSourceCompactionConfig> compactionConfigs)
{ {
DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
.newBuilder() .newBuilder()
.withUsedSegmentsTimelinesPerDataSourceInTest(dataSources) .withUsedSegmentsTimelinesPerDataSourceInTest(dataSources)
.withCompactionConfig(CoordinatorCompactionConfig.from(createCompactionConfigs())) .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs))
.build(); .build();
return compactSegments.run(params).getCoordinatorStats(); return compactSegments.run(params).getCoordinatorStats();
} }
@ -300,9 +656,9 @@ public class CompactSegmentsTest
// If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for // If expectedRemainingSegments is positive, we check how many dataSources have the segments waiting for
// compaction. // compaction.
long numDataSourceOfExpectedRemainingSegments = stats long numDataSourceOfExpectedRemainingSegments = stats
.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION) .getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING)
.stream() .stream()
.mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION, ds)) .mapToLong(ds -> stats.getDataSourceStat(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING, ds))
.filter(stat -> stat == expectedRemainingSegments) .filter(stat -> stat == expectedRemainingSegments)
.count(); .count();
Assert.assertEquals(i + 1, numDataSourceOfExpectedRemainingSegments); Assert.assertEquals(i + 1, numDataSourceOfExpectedRemainingSegments);
@ -310,7 +666,7 @@ public class CompactSegmentsTest
// Otherwise, we check how many dataSources are in the coordinator stats. // Otherwise, we check how many dataSources are in the coordinator stats.
Assert.assertEquals( Assert.assertEquals(
2 - i, 2 - i,
stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING_COMPACTION).size() stats.getDataSources(CompactSegments.TOTAL_SIZE_OF_SEGMENTS_AWAITING).size()
); );
} }
} }

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.Map;
public class CompactionResourceTest
{
private DruidCoordinator mock;
private String dataSourceName = "datasource_1";
private AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot(
dataSourceName,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
1,
1,
1,
1,
1,
1,
1,
1,
1
);
@Before
public void setUp()
{
mock = EasyMock.createStrictMock(DruidCoordinator.class);
}
@After
public void tearDown()
{
EasyMock.verify(mock);
}
@Test
public void testGetCompactionSnapshotForDataSourceWithEmptyQueryParameter()
{
Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
dataSourceName,
expectedSnapshot
);
EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource("");
Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testGetCompactionSnapshotForDataSourceWithNullQueryParameter()
{
String dataSourceName = "datasource_1";
Map<String, AutoCompactionSnapshot> expected = ImmutableMap.of(
dataSourceName,
expectedSnapshot
);
EasyMock.expect(mock.getAutoCompactionSnapshot()).andReturn(expected).once();
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(null);
Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testGetCompactionSnapshotForDataSourceWithValidQueryParameter()
{
String dataSourceName = "datasource_1";
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(expectedSnapshot).once();
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
Assert.assertEquals(ImmutableMap.of("latestStatus", ImmutableList.of(expectedSnapshot)), response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testGetCompactionSnapshotForDataSourceWithInvalidQueryParameter()
{
String dataSourceName = "invalid_datasource";
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(null).once();
EasyMock.replay(mock);
final Response response = new CompactionResource(mock).getCompactionSnapshotForDataSource(dataSourceName);
Assert.assertEquals(400, response.getStatus());
}
}