Refactor and add tests and metric to KillUnusedSegments duty (auto-kill) (#15941)

* Kill duty and test improvements.

Initial commit with:
- Bug fixes - auto-kill can throw NPE when there are no datasources present and defaults mismatch.
- Add new stat for candidate segment intervals killed.
- Move a couple of debug logs to info logs for improved visibility (should only log once per kill period).
- Remove redundant checks for code readability.
- Updated tests from using mocks (also the mocks weren't using last updated timestamp) and
  add more test coverage for different config parameters.
- Add a couple of unit tests that are ignored for the eternity case to prove that
  the kill duty doesn't clean up segments with ALL grain or that end in DateTimes.MAX.
- Migrate Druid exception from user to operator persona.

* Address review comments.

* Remove unused methods.

* fix up format specifier and validate bad config tests.

* Consolidate the helpers a bit more and add another test.

* Update test names. Add javadoc placeholders for slightly involved tests.

* Add docs for metric kill/candidateUnusedSegments/count.

Also, rename to disambiguate.

* Comments.

* Apply logging suggestions from code review

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Review comments

- Clarify docs on eligibility.
- Add test for multiple segments in the same interval. Clarify comment.
- Remove log line from test.
- Remove lastUpdatedDate = now.plus(10) from test.

* minor cleanup.

* Clarify javadocs for getUnusedSegmentIntervals().

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
Abhishek Radhakrishnan 2024-02-27 12:14:41 +05:30 committed by GitHub
parent 17e4f3ac60
commit 38ecf980d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 960 additions and 489 deletions

View File

@ -342,6 +342,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies|
|`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies|
|`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies|
|`kill/candidateUnusedSegments/count`|The number of candidate unused segments eligible for deletion from the metadata store during an auto kill run for a datasource.|`dataSource`|Varies|
|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies|
|`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|
|`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies|

View File

@ -164,7 +164,8 @@ public interface SegmentsMetadataManager
Set<String> retrieveAllDataSourceNames();
/**
* Returns a list of up to {@code limit} unused segment intervals for the specified datasource. Segments are filtered based on the following criteria:
* Returns a list of up to {@code limit} unused segment intervals for the specified datasource. Segments are filtered
* based on the following criteria:
*
* <li> The start time of the segment must be no earlier than the specified {@code minStartTime} (if not null). </li>
* <li> The end time of the segment must be no later than the specified {@code maxEndTime}. </li>
@ -172,9 +173,9 @@ public interface SegmentsMetadataManager
* Segments that have no {@code used_status_last_updated} time (due to an upgrade from legacy Druid) will
* have {@code maxUsedStatusLastUpdatedTime} ignored. </li>
*
* <p>
* The list of intervals is ordered by segment start time and then by end time.
* </p>
* @return list of intervals ordered by segment start time and then by end time. Note that the list may contain
* duplicate intervals.
*
*/
List<Interval> getUnusedSegmentIntervals(
String dataSource,

View File

@ -1140,7 +1140,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
@Nullable final DateTime minStartTime,
final DateTime maxEndTime,
final int limit,
DateTime maxUsedStatusLastUpdatedTime
final DateTime maxUsedStatusLastUpdatedTime
)
{
// Note that we handle the case where used_status_last_updated IS NULL here to allow smooth transition to Druid version that uses used_status_last_updated column

View File

@ -19,11 +19,9 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
@ -31,9 +29,12 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
@ -41,6 +42,7 @@ import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -53,8 +55,8 @@ import java.util.concurrent.ConcurrentHashMap;
* negative meaning the interval end target will be in the future. Also, {@link #durationToRetain} can be ignored if
* {@link #ignoreDurationToRetain} is enabled, meaning that there is no upper bound to the end interval of segments that
* will be killed. The umbrella interval of the unused segments per datasource to be killed is determined by
* {@link #findIntervalForKill(String, DateTime)}, which takes into account the configured {@link #bufferPeriod}. However,
* the kill task needs to check again for max {@link #bufferPeriod} for the unused segments in the widened interval
* {@link #findIntervalForKill(String, DateTime, CoordinatorRunStats)}, which takes into account the configured {@link #bufferPeriod}.
* However, the kill task needs to check again for max {@link #bufferPeriod} for the unused segments in the widened interval
* as there can be multiple unused segments with different {@code used_status_last_updated} time.
* </p>
* <p>
@ -65,7 +67,8 @@ public class KillUnusedSegments implements CoordinatorDuty
{
public static final String KILL_TASK_TYPE = "kill";
public static final String TASK_ID_PREFIX = "coordinator-issued";
public static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK =
private static final Predicate<TaskStatusPlus> IS_AUTO_KILL_TASK =
status -> null != status
&& (KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX));
private static final Logger log = new Logger(KillUnusedSegments.class);
@ -87,29 +90,37 @@ public class KillUnusedSegments implements CoordinatorDuty
private final OverlordClient overlordClient;
public KillUnusedSegments(
SegmentsMetadataManager segmentsMetadataManager,
OverlordClient overlordClient,
DruidCoordinatorConfig config
final SegmentsMetadataManager segmentsMetadataManager,
final OverlordClient overlordClient,
final DruidCoordinatorConfig config
)
{
if (config.getCoordinatorKillPeriod().getMillis() < config.getCoordinatorIndexingPeriod().getMillis()) {
throw InvalidInput.exception(
"druid.coordinator.kill.period[%s] must be >= druid.coordinator.period.indexingPeriod[%s]",
config.getCoordinatorKillPeriod(),
config.getCoordinatorIndexingPeriod()
);
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
StringUtils.format(
"druid.coordinator.kill.period[%s] is invalid. It must be greater than or "
+ "equal to druid.coordinator.period.indexingPeriod[%s].",
config.getCoordinatorKillPeriod(),
config.getCoordinatorIndexingPeriod()
)
);
}
if (config.getCoordinatorKillMaxSegments() < 0) {
throw InvalidInput.exception(
"druid.coordinator.kill.maxSegments[%s] is invalid. It must be a positive integer.",
config.getCoordinatorKillMaxSegments()
);
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(StringUtils.format(
"druid.coordinator.kill.maxSegments[%d] is invalid. It must be a positive integer.",
config.getCoordinatorKillMaxSegments()
)
);
}
this.period = config.getCoordinatorKillPeriod();
this.ignoreDurationToRetain = config.getCoordinatorKillIgnoreDurationToRetain();
this.durationToRetain = config.getCoordinatorKillDurationToRetain();
if (this.ignoreDurationToRetain) {
log.debug(
log.info(
"druid.coordinator.kill.durationToRetain[%s] will be ignored when discovering segments to kill "
+ "because druid.coordinator.kill.ignoreDurationToRetain is set to true.",
this.durationToRetain
@ -132,7 +143,7 @@ public class KillUnusedSegments implements CoordinatorDuty
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams params)
{
if (!canDutyRun()) {
log.debug(
@ -146,130 +157,128 @@ public class KillUnusedSegments implements CoordinatorDuty
return runInternal(params);
}
@VisibleForTesting
DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params)
private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeParams params)
{
TaskStats taskStats = new TaskStats();
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
double killTaskSlotRatio = params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
int maxKillTaskSlots = params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
int killTaskCapacity = getKillTaskCapacity(
CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
killTaskSlotRatio,
maxKillTaskSlots
);
int availableKillTaskSlots = getAvailableKillTaskSlots(
killTaskCapacity,
CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, IS_AUTO_KILL_TASK).size()
);
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final CoordinatorRunStats stats = params.getCoordinatorStats();
taskStats.availableTaskSlots = availableKillTaskSlots;
taskStats.maxSlots = killTaskCapacity;
final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats);
Collection<String> dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
if (0 < availableKillTaskSlots) {
if (availableKillTaskSlots > 0) {
// If no datasource has been specified, all are eligible for killing unused segments
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}
log.debug("Killing unused segments for datasources[%s]", dataSourcesToKill);
lastKillTime = DateTimes.nowUtc();
taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
}
// any datasources that are no longer being considered for kill should have their
// last kill interval removed from map.
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
addStats(taskStats, stats);
return params;
}
private void addStats(
TaskStats taskStats,
CoordinatorRunStats stats
/**
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}.
*/
private void killUnusedSegments(
@Nullable final Collection<String> dataSourcesToKill,
final int availableKillTaskSlots,
final CoordinatorRunStats stats
)
{
stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
}
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || availableKillTaskSlots <= 0) {
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
private int killUnusedSegments(
Collection<String> dataSourcesToKill,
int availableKillTaskSlots
)
{
final Collection<String> remainingDatasourcesToKill = new ArrayList<>(dataSourcesToKill);
int submittedTasks = 0;
if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
for (String dataSource : dataSourcesToKill) {
if (submittedTasks >= availableKillTaskSlots) {
log.debug(StringUtils.format(
"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume "
+ "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots));
break;
}
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
continue;
}
for (String dataSource : dataSourcesToKill) {
if (submittedTasks >= availableKillTaskSlots) {
log.info(
"Submitted [%d] kill tasks and reached kill task slot limit [%d].",
submittedTasks, availableKillTaskSlots
);
break;
}
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
continue;
}
try {
FutureUtils.getUnchecked(
overlordClient.runKillTask(
TASK_ID_PREFIX,
dataSource,
intervalToKill,
maxSegmentsToKill,
maxUsedStatusLastUpdatedTime
),
true
);
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill);
if (Thread.currentThread().isInterrupted()) {
log.warn("Skipping kill task scheduling because thread is interrupted.");
break;
}
try {
FutureUtils.getUnchecked(
overlordClient.runKillTask(
TASK_ID_PREFIX,
dataSource,
intervalToKill,
maxSegmentsToKill,
maxUsedStatusLastUpdatedTime
),
true
);
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
remainingDatasourcesToKill.remove(dataSource);
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill);
if (Thread.currentThread().isInterrupted()) {
log.warn("Skipping kill task scheduling because thread is interrupted.");
break;
}
}
}
if (log.isDebugEnabled()) {
log.debug(
"Submitted [%d] kill tasks for [%d] datasources.%s",
submittedTasks,
dataSourcesToKill.size(),
availableKillTaskSlots < dataSourcesToKill.size()
? StringUtils.format(
" Datasources skipped: %s",
ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, dataSourcesToKill.size())
)
: ""
);
}
log.info(
"Submitted [%d] kill tasks for [%d] datasources. Remaining datasources to kill: %s",
submittedTasks, dataSourcesToKill.size(), remainingDatasourcesToKill
);
// report stats
return submittedTasks;
stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
}
/**
* <p>
* Calculates the interval for which segments are to be killed in a datasource.
* Since this method compares datetime as strings, it cannot find unused segments that are outside
* the range [{@link DateTimes#COMPARE_DATE_AS_STRING_MIN}, {@link DateTimes#COMPARE_DATE_AS_STRING_MAX}),
* such as {@link org.apache.druid.java.util.common.granularity.Granularities#ALL} partitioned segments
* and segments that end in {@link DateTimes#MAX}.
*</p><p>
* For more information, see <a href="https://github.com/apache/druid/issues/15951"> Issue#15951</a>.
* </p>
*/
@Nullable
private Interval findIntervalForKill(String dataSource, DateTime maxUsedStatusLastUpdatedTime)
private Interval findIntervalForKill(
final String dataSource,
final DateTime maxUsedStatusLastUpdatedTime,
final CoordinatorRunStats stats
)
{
final DateTime maxEndTime = ignoreDurationToRetain
? DateTimes.COMPARE_DATE_AS_STRING_MAX
: DateTimes.nowUtc().minus(durationToRetain);
List<Interval> unusedSegmentIntervals = segmentsMetadataManager
.getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, maxUsedStatusLastUpdatedTime);
final List<Interval> unusedSegmentIntervals = segmentsMetadataManager.getUnusedSegmentIntervals(
dataSource,
datasourceToLastKillIntervalEnd.get(dataSource),
maxEndTime,
maxSegmentsToKill,
maxUsedStatusLastUpdatedTime
);
// Each unused segment interval returned above has a 1:1 correspondence with an unused segment. So we can assume
// these are candidate segments eligible for deletion by the kill task. After the umbrella interval is computed
// below, we cannot say the same as there can be multiple unused segments with different usedStatusLastUpdatedTime.
final RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Kill.CANDIDATE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
return null;
@ -280,43 +289,25 @@ public class KillUnusedSegments implements CoordinatorDuty
}
}
private int getAvailableKillTaskSlots(int killTaskCapacity, int numActiveKillTasks)
{
return Math.max(
0,
killTaskCapacity - numActiveKillTasks
);
}
private boolean canDutyRun()
{
return lastKillTime == null || !DateTimes.nowUtc().isBefore(lastKillTime.plus(period));
}
@VisibleForTesting
static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio, int maxKillTaskSlots)
private int getAvailableKillTaskSlots(final CoordinatorDynamicConfig config, final CoordinatorRunStats stats)
{
return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots);
}
final int killTaskCapacity = Math.min(
(int) (CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient) * Math.min(config.getKillTaskSlotRatio(), 1.0)),
config.getMaxKillTaskSlots()
);
@VisibleForTesting
Map<String, DateTime> getDatasourceToLastKillIntervalEnd()
{
return datasourceToLastKillIntervalEnd;
}
final int availableKillTaskSlots = Math.max(
0,
killTaskCapacity - CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, IS_AUTO_KILL_TASK).size()
);
static class TaskStats
{
int availableTaskSlots;
int maxSlots;
int submittedTasks;
TaskStats()
{
availableTaskSlots = 0;
maxSlots = 0;
submittedTasks = 0;
}
stats.add(Stats.Kill.AVAILABLE_SLOTS, availableKillTaskSlots);
stats.add(Stats.Kill.MAX_SLOTS, killTaskCapacity);
return availableKillTaskSlots;
}
}

View File

@ -149,6 +149,8 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count");
public static final CoordinatorStat SUBMITTED_TASKS
= CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count");
public static final CoordinatorStat CANDIDATE_UNUSED_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killCandidateUnusedSegs", "kill/candidateUnusedSegments/count");
public static final CoordinatorStat PENDING_SEGMENTS
= CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count");
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;
import org.joda.time.Duration;
import org.joda.time.Period;
public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
{
@ -289,30 +290,30 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
public static class Builder
{
private static final Duration DEFAULT_COORDINATOR_START_DELAY = new Duration("PT300s");
private static final Duration DEFAULT_COORDINATOR_PERIOD = new Duration("PT60s");
private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = new Duration("PT1800s");
private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = new Duration("PT3600s");
private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_START_DELAY = Period.parse("PT300s").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_PERIOD = Period.parse("PT60s").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = Period.parse("PT1800s").toStandardDuration();
private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = Period.parse("PT1H").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_BUFFER_PERIOD = Period.parse("P30D").toStandardDuration();
private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false;
private static final int DEFAULT_COORDINATOR_KILL_MAX_SEGMENTS = 100;
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_COMPACTION_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_SUPERVISOR_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_COMPACTION_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_RULE_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_DATASOURCE_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private static final Duration DEFAULT_LOAD_TIMEOUT_DELAY = new Duration(15 * 60 * 1000);
private static final String DEFAULT_LOAD_QUEUE_PEON_TYPE = "curator";
private static final int DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS = 2;
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_REPEAT_DELAY = Duration.millis(60000);
private static final Duration DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT = Duration.millis(300000);
private static final int DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE = 1;
private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s");
private static final Duration DEFAULT_COORDINATOR_KILL_BUFFER_PERIOD = new Duration("PT86400s");
private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private Duration coordinatorStartDelay;
@ -497,7 +498,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod,
loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay,
coordinatorKillPeriod == null ? DEFAULT_COORDINATOR_KILL_PERIOD : coordinatorKillPeriod,
coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION : coordinatorKillDurationToRetain,
coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN
: coordinatorKillDurationToRetain,
coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod,
coordinatorSupervisorKillDurationToRetain == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_DURATION_TO_RETAIN : coordinatorSupervisorKillDurationToRetain,
coordinatorAuditKillPeriod == null ? DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD : coordinatorAuditKillPeriod,

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SortOrder;
import org.apache.druid.server.http.DataSegmentPlus;
@ -34,33 +35,44 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class TestSegmentsMetadataManager implements SegmentsMetadataManager
{
private final ConcurrentMap<String, DataSegment> segments = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DataSegment> allSegments = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DataSegment> usedSegments = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DataSegmentPlus> unusedSegments = new ConcurrentHashMap<>();
private volatile DataSourcesSnapshot snapshot;
public void addSegment(DataSegment segment)
{
segments.put(segment.getId().toString(), segment);
allSegments.put(segment.getId().toString(), segment);
usedSegments.put(segment.getId().toString(), segment);
snapshot = null;
}
public void removeSegment(DataSegment segment)
{
segments.remove(segment.getId().toString());
allSegments.remove(segment.getId().toString());
usedSegments.remove(segment.getId().toString());
snapshot = null;
}
public void addUnusedSegment(DataSegmentPlus segment)
{
unusedSegments.put(segment.getDataSegment().getId().toString(), segment);
allSegments.put(segment.getDataSegment().getId().toString(), segment.getDataSegment());
snapshot = null;
}
@Override
public void startPollingDatabasePeriodically()
{
@ -100,11 +112,11 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
@Override
public boolean markSegmentAsUsed(String segmentId)
{
if (!segments.containsKey(segmentId)) {
if (!allSegments.containsKey(segmentId)) {
return false;
}
usedSegments.put(segmentId, segments.get(segmentId));
usedSegments.put(segmentId, allSegments.get(segmentId));
return true;
}
@ -124,8 +136,13 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
{
int numModifiedSegments = 0;
final DateTime now = DateTimes.nowUtc();
for (SegmentId segmentId : segmentIds) {
if (usedSegments.remove(segmentId.toString()) != null) {
if (allSegments.containsKey(segmentId.toString())) {
DataSegment dataSegment = allSegments.get(segmentId.toString());
unusedSegments.put(segmentId.toString(), new DataSegmentPlus(dataSegment, now, now));
usedSegments.remove(segmentId.toString());
++numModifiedSegments;
}
}
@ -209,7 +226,7 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
@Override
public Set<String> retrieveAllDataSourceNames()
{
return null;
return allSegments.values().stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
}
@Override
@ -221,7 +238,28 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
final DateTime maxUsedStatusLastUpdatedTime
)
{
return null;
final List<DataSegmentPlus> sortedUnusedSegmentPluses = new ArrayList<>(unusedSegments.values());
sortedUnusedSegmentPluses.sort(
Comparator.comparingLong(
dataSegmentPlus -> dataSegmentPlus.getDataSegment().getInterval().getStartMillis()
)
);
final List<Interval> unusedSegmentIntervals = new ArrayList<>();
for (final DataSegmentPlus unusedSegmentPlus : sortedUnusedSegmentPluses) {
final DataSegment unusedSegment = unusedSegmentPlus.getDataSegment();
if (dataSource.equals(unusedSegment.getDataSource())) {
final Interval interval = unusedSegment.getInterval();
if ((minStartTime == null || interval.getStart().isAfter(minStartTime)) &&
interval.getEnd().isBefore(maxEndTime) &&
unusedSegmentPlus.getUsedStatusLastUpdatedDate().isBefore(maxUsedStatusLastUpdatedTime)) {
unusedSegmentIntervals.add(interval);
}
}
}
return unusedSegmentIntervals.stream().limit(limit).collect(Collectors.toList());
}
@Override