mirror of https://github.com/apache/druid.git
Enable Continuous auto kill (#14831)
### Description
This change enables the `KillUnusedSegments` coordinator duty to be scheduled continuously. Things that prevented this, or made this difficult before were the following:
1. If scheduled at fast enough rate, the duty would find the same intervals to kill for the same datasources, while kill tasks submitted for those same datasources and intervals were already underway, thus wasting task slots on duplicated work.
2. The task resources used by auto kill were previously unbounded. Each duty run period, if unused
segments were found for any datasource, a kill task would be submitted to kill them.
This pr solves for both of these issues:
1. The duty keeps track of the end time of the last interval found when killing unused segments for each datasource, in a in memory map. The end time for each datasource, if found, is used as the start time lower bound, when searching for unused intervals for that same datasource. Each duty run, we remove any datasource keys from this map that are no longer found to match datasources in the system, or in whitelist, and also remove a datasource entry, if there is found to be no unused segments for the datasource, which happens when we fail to find an interval which includes unused segments. Removing the datasource entry from the map, allows for searching for unusedSegments in the datasource from the beginning of time once again
2. The unbounded task resource usage can be mitigated with coordinator dynamic config added as part of ba957a9b97
Operators can configure continous auto kill by providing coordinator runtime properties similar to the following:
```
druid.coordinator.period.indexingPeriod=PT60S
druid.coordinator.kill.period=PT60S
```
And providing sensible limits to the killTask usage via coordinator dynamic properties.
This commit is contained in:
parent
dfb5a98888
commit
0c76df1c7d
|
@ -855,7 +855,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|
|||
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M|
|
||||
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|
||||
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.<br /><br />When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false|
|
||||
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|
||||
|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 day)|
|
||||
|`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.<br /><br />Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`|
|
||||
|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false|
|
||||
|`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`|
|
||||
|
|
|
@ -139,16 +139,17 @@ public interface SegmentsMetadataManager
|
|||
Set<String> retrieveAllDataSourceNames();
|
||||
|
||||
/**
|
||||
* Returns top N unused segment intervals with the end time no later than the specified maxEndTime and
|
||||
* used_status_last_updated time no later than maxLastUsedTime when ordered by segment start time, end time. Any segment having no
|
||||
* used_status_last_updated time due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is ignored for that segment.
|
||||
* Returns top N unused segment intervals with the start time no earlier than the specified start time (if not null)
|
||||
* and with the end time no later than the specified maxEndTime and with sed_status_last_updated time no later than
|
||||
* maxLastUsedTime when ordered by segment start time, end time. Any segment having no used_status_last_updated time
|
||||
* due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is ignored for that segment.
|
||||
*/
|
||||
List<Interval> getUnusedSegmentIntervals(
|
||||
String dataSource,
|
||||
DateTime minStartTime,
|
||||
DateTime maxEndTime,
|
||||
int limit,
|
||||
DateTime maxUsedFlagLastUpdatedTime
|
||||
);
|
||||
DateTime maxUsedFlagLastUpdatedTime);
|
||||
|
||||
@VisibleForTesting
|
||||
void poll();
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.skife.jdbi.v2.BaseResultSetMapper;
|
|||
import org.skife.jdbi.v2.Batch;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.Query;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.TransactionCallback;
|
||||
import org.skife.jdbi.v2.TransactionStatus;
|
||||
|
@ -1088,6 +1089,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
@Override
|
||||
public List<Interval> getUnusedSegmentIntervals(
|
||||
final String dataSource,
|
||||
@Nullable final DateTime minStartTime,
|
||||
final DateTime maxEndTime,
|
||||
final int limit,
|
||||
DateTime maxUsedFlagLastUpdatedTime
|
||||
|
@ -1100,13 +1102,14 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
@Override
|
||||
public List<Interval> inTransaction(Handle handle, TransactionStatus status)
|
||||
{
|
||||
Iterator<Interval> iter = handle
|
||||
final Query<Interval> sql = handle
|
||||
.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND "
|
||||
+ "%2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated ORDER BY start, %2$send%2$s",
|
||||
+ "%2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated %3$s ORDER BY start, %2$send%2$s",
|
||||
getSegmentsTable(),
|
||||
connector.getQuoteString()
|
||||
connector.getQuoteString(),
|
||||
null != minStartTime ? "AND start >= :start" : ""
|
||||
)
|
||||
)
|
||||
.setFetchSize(connector.getStreamingFetchSize())
|
||||
|
@ -1126,8 +1129,12 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
.iterator();
|
||||
);
|
||||
if (null != minStartTime) {
|
||||
sql.bind("start", minStartTime.toString());
|
||||
}
|
||||
|
||||
Iterator<Interval> iter = sql.iterator();
|
||||
|
||||
|
||||
List<Interval> result = Lists.newArrayListWithCapacity(limit);
|
||||
|
|
|
@ -44,6 +44,8 @@ import javax.annotation.Nullable;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Completely removes information about unused segments who have an interval end that comes before
|
||||
|
@ -67,6 +69,12 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
private final long retainDuration;
|
||||
private final boolean ignoreRetainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
|
||||
/**
|
||||
* Used to keep track of the last interval end time that was killed for each
|
||||
* datasource.
|
||||
*/
|
||||
private final Map<String, DateTime> datasourceToLastKillIntervalEnd;
|
||||
private long lastKillTime = 0;
|
||||
private final long bufferPeriod;
|
||||
|
||||
|
@ -82,8 +90,8 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
{
|
||||
this.period = config.getCoordinatorKillPeriod().getMillis();
|
||||
Preconditions.checkArgument(
|
||||
this.period > config.getCoordinatorIndexingPeriod().getMillis(),
|
||||
"coordinator kill period must be greater than druid.coordinator.period.indexingPeriod"
|
||||
this.period >= config.getCoordinatorIndexingPeriod().getMillis(),
|
||||
"coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod"
|
||||
);
|
||||
|
||||
this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain();
|
||||
|
@ -100,6 +108,8 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
||||
datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
|
||||
|
||||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], bufferPeriod [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
|
@ -115,12 +125,18 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
|
||||
final long currentTimeMillis = System.currentTimeMillis();
|
||||
if (lastKillTime + period > currentTimeMillis) {
|
||||
log.debug("Skipping kill of unused segments as kill period has not elapsed yet.");
|
||||
return params;
|
||||
}
|
||||
|
||||
return runInternal(params);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
TaskStats taskStats = new TaskStats();
|
||||
Collection<String> dataSourcesToKill =
|
||||
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
|
||||
|
@ -147,11 +163,14 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
}
|
||||
|
||||
log.debug("Killing unused segments in datasources: %s", dataSourcesToKill);
|
||||
lastKillTime = currentTimeMillis;
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
|
||||
|
||||
}
|
||||
|
||||
// any datasources that are no longer being considered for kill should have their
|
||||
// last kill interval removed from map.
|
||||
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
|
||||
addStats(taskStats, stats);
|
||||
return params;
|
||||
}
|
||||
|
@ -175,13 +194,14 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
|
||||
for (String dataSource : dataSourcesToKill) {
|
||||
if (submittedTasks >= availableKillTaskSlots) {
|
||||
log.info(StringUtils.format(
|
||||
log.debug(StringUtils.format(
|
||||
"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume "
|
||||
+ "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots));
|
||||
break;
|
||||
}
|
||||
final Interval intervalToKill = findIntervalForKill(dataSource);
|
||||
if (intervalToKill == null) {
|
||||
datasourceToLastKillIntervalEnd.remove(dataSource);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -193,6 +213,7 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
maxSegmentsToKill
|
||||
), true);
|
||||
++submittedTasks;
|
||||
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);
|
||||
|
@ -233,7 +254,7 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
: DateTimes.nowUtc().minus(retainDuration);
|
||||
|
||||
List<Interval> unusedSegmentIntervals = segmentsMetadataManager
|
||||
.getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod));
|
||||
.getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod));
|
||||
|
||||
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
|
||||
return null;
|
||||
|
@ -258,6 +279,13 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, DateTime> getDatasourceToLastKillIntervalEnd()
|
||||
{
|
||||
return datasourceToLastKillIntervalEnd;
|
||||
}
|
||||
|
||||
|
||||
static class TaskStats
|
||||
{
|
||||
int availableTaskSlots;
|
||||
|
|
|
@ -425,18 +425,26 @@ public class SqlSegmentsMetadataManagerTest
|
|||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment2.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
);
|
||||
|
||||
// Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment2.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment1.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX)
|
||||
);
|
||||
|
||||
// Test a buffer period that should exclude some segments
|
||||
|
@ -444,7 +452,7 @@ public class SqlSegmentsMetadataManagerTest
|
|||
// The wikipedia datasource has segments generated with last used time equal to roughly the time of test run. None of these segments should be selected with a bufer period of 1 day
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
|
||||
);
|
||||
|
||||
// One of the 3 segments in newDs has a null used_status_last_updated which should mean getUnusedSegmentIntervals never returns it
|
||||
|
@ -452,7 +460,7 @@ public class SqlSegmentsMetadataManagerTest
|
|||
// The last of the 3 segemns in newDs has a used_status_last_updated date less than one day and should not be returned
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(newSegment2.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S")))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.druid.server.coordinator.duty;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
|
||||
import org.apache.druid.indexer.RunnerTaskState;
|
||||
|
@ -55,6 +56,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -70,6 +72,7 @@ public class KillUnusedSegmentsTest
|
|||
private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2);
|
||||
private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
|
||||
private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
|
||||
private static final String DATASOURCE = "DS1";
|
||||
|
||||
@Mock
|
||||
private SegmentsMetadataManager segmentsMetadataManager;
|
||||
|
@ -105,7 +108,7 @@ public class KillUnusedSegmentsTest
|
|||
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
|
||||
Mockito.doReturn(Duration.parse("PT3154000000S")).when(config).getCoordinatorKillBufferPeriod();
|
||||
|
||||
Mockito.doReturn(Collections.singleton("DS1"))
|
||||
Mockito.doReturn(Collections.singleton(DATASOURCE))
|
||||
.when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn();
|
||||
|
||||
final DateTime now = DateTimes.nowUtc();
|
||||
|
@ -130,23 +133,31 @@ public class KillUnusedSegmentsTest
|
|||
segmentsMetadataManager.getUnusedSegmentIntervals(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any()
|
||||
)
|
||||
).thenAnswer(invocation -> {
|
||||
DateTime maxEndTime = invocation.getArgument(1);
|
||||
DateTime minStartTime = invocation.getArgument(1);
|
||||
DateTime maxEndTime = invocation.getArgument(2);
|
||||
long maxEndMillis = maxEndTime.getMillis();
|
||||
Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : null;
|
||||
List<Interval> unusedIntervals =
|
||||
unusedSegments.stream()
|
||||
.map(DataSegment::getInterval)
|
||||
.filter(i -> i.getEnd().getMillis() <= maxEndMillis)
|
||||
.filter(i -> i.getEnd().getMillis() <= maxEndMillis
|
||||
&& (null == minStartMillis || i.getStart().getMillis() >= minStartMillis))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
int limit = invocation.getArgument(2);
|
||||
int limit = invocation.getArgument(3);
|
||||
return unusedIntervals.size() <= limit ? unusedIntervals : unusedIntervals.subList(0, limit);
|
||||
});
|
||||
|
||||
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -155,6 +166,7 @@ public class KillUnusedSegmentsTest
|
|||
Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.any(),
|
||||
ArgumentMatchers.anyInt(),
|
||||
ArgumentMatchers.any()
|
||||
);
|
||||
|
@ -170,7 +182,11 @@ public class KillUnusedSegmentsTest
|
|||
{
|
||||
Mockito.doReturn(Duration.standardDays(400))
|
||||
.when(config).getCoordinatorKillDurationToRetain();
|
||||
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
|
||||
// No unused segment is older than the retention period
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
|
@ -189,6 +205,7 @@ public class KillUnusedSegmentsTest
|
|||
);
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(expectedKillInterval);
|
||||
verifyState(ImmutableMap.of(DATASOURCE, dayOldSegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10);
|
||||
}
|
||||
|
||||
|
@ -198,7 +215,11 @@ public class KillUnusedSegmentsTest
|
|||
// Duration to retain = -1 day, reinit target for config to take effect
|
||||
Mockito.doReturn(DURATION_TO_RETAIN.negated())
|
||||
.when(config).getCoordinatorKillDurationToRetain();
|
||||
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
|
||||
// Segments upto 1 day in the future are killed
|
||||
Interval expectedKillInterval = new Interval(
|
||||
|
@ -207,6 +228,7 @@ public class KillUnusedSegmentsTest
|
|||
);
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(expectedKillInterval);
|
||||
verifyState(ImmutableMap.of(DATASOURCE, nextDaySegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10);
|
||||
}
|
||||
|
||||
|
@ -215,7 +237,11 @@ public class KillUnusedSegmentsTest
|
|||
{
|
||||
Mockito.doReturn(true)
|
||||
.when(config).getCoordinatorKillIgnoreDurationToRetain();
|
||||
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
|
||||
// All future and past unused segments are killed
|
||||
Interval expectedKillInterval = new Interval(
|
||||
|
@ -224,6 +250,7 @@ public class KillUnusedSegmentsTest
|
|||
);
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(expectedKillInterval);
|
||||
verifyState(ImmutableMap.of(DATASOURCE, nextMonthSegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10);
|
||||
}
|
||||
|
||||
|
@ -232,19 +259,61 @@ public class KillUnusedSegmentsTest
|
|||
{
|
||||
Mockito.doReturn(1)
|
||||
.when(config).getCoordinatorKillMaxSegments();
|
||||
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
// Only 1 unused segment is killed
|
||||
runAndVerifyKillInterval(yearOldSegment.getInterval());
|
||||
verifyState(ImmutableMap.of(DATASOURCE, yearOldSegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRuns()
|
||||
{
|
||||
Mockito.doReturn(true)
|
||||
.when(config).getCoordinatorKillIgnoreDurationToRetain();
|
||||
Mockito.doReturn(2)
|
||||
.when(config).getCoordinatorKillMaxSegments();
|
||||
target = new KillUnusedSegments(
|
||||
segmentsMetadataManager,
|
||||
overlordClient,
|
||||
config
|
||||
);
|
||||
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(new Interval(
|
||||
yearOldSegment.getInterval().getStart(),
|
||||
monthOldSegment.getInterval().getEnd()
|
||||
));
|
||||
verifyState(ImmutableMap.of(DATASOURCE, monthOldSegment.getInterval().getEnd()));
|
||||
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(new Interval(
|
||||
dayOldSegment.getInterval().getStart(),
|
||||
hourOldSegment.getInterval().getEnd()
|
||||
));
|
||||
verifyState(ImmutableMap.of(DATASOURCE, hourOldSegment.getInterval().getEnd()));
|
||||
|
||||
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
|
||||
runAndVerifyKillInterval(new Interval(
|
||||
nextDaySegment.getInterval().getStart(),
|
||||
nextMonthSegment.getInterval().getEnd()
|
||||
));
|
||||
verifyState(ImmutableMap.of(DATASOURCE, nextMonthSegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
|
||||
{
|
||||
mockTaskSlotUsage(0.10, 10, 1, 5);
|
||||
runAndVerifyNoKill();
|
||||
verifyState(ImmutableMap.of());
|
||||
verifyStats(0, 0, 0);
|
||||
}
|
||||
|
||||
|
@ -253,6 +322,8 @@ public class KillUnusedSegmentsTest
|
|||
{
|
||||
mockTaskSlotUsage(1.0, 3, 3, 10);
|
||||
runAndVerifyNoKill();
|
||||
verifyState(ImmutableMap.of());
|
||||
verifyStats(0, 0, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -299,21 +370,57 @@ public class KillUnusedSegmentsTest
|
|||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(Interval.class),
|
||||
ArgumentMatchers.anyInt());
|
||||
target.run(params);
|
||||
target.runInternal(params);
|
||||
|
||||
Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.eq("DS1"),
|
||||
ArgumentMatchers.eq(DATASOURCE),
|
||||
ArgumentMatchers.eq(expectedKillInterval),
|
||||
ArgumentMatchers.eq(limit)
|
||||
);
|
||||
}
|
||||
|
||||
private void runAndVerifyKillIntervals(List<Interval> expectedKillIntervals)
|
||||
{
|
||||
int limit = config.getCoordinatorKillMaxSegments();
|
||||
Mockito.doReturn(Futures.immediateFuture("ok"))
|
||||
.when(overlordClient)
|
||||
.runKillTask(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.any(Interval.class),
|
||||
ArgumentMatchers.anyInt());
|
||||
for (int i = 0; i < expectedKillIntervals.size(); i++) {
|
||||
target.run(params);
|
||||
verifyState(ImmutableMap.of(DATASOURCE, yearOldSegment.getInterval().getEnd()));
|
||||
verifyStats(9, 1, 10);
|
||||
}
|
||||
|
||||
for (Interval expectedKillInterval : expectedKillIntervals) {
|
||||
Mockito.verify(overlordClient, Mockito.times(1)).runKillTask(
|
||||
ArgumentMatchers.anyString(),
|
||||
ArgumentMatchers.eq(DATASOURCE),
|
||||
ArgumentMatchers.eq(expectedKillInterval),
|
||||
ArgumentMatchers.eq(limit)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyStats(int availableSlots, int submittedTasks, int maxSlots)
|
||||
{
|
||||
Mockito.verify(stats).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots);
|
||||
Mockito.verify(stats).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
|
||||
Mockito.verify(stats).add(Stats.Kill.MAX_SLOTS, maxSlots);
|
||||
verifyStats(availableSlots, submittedTasks, maxSlots, 1);
|
||||
}
|
||||
|
||||
private void verifyStats(int availableSlots, int submittedTasks, int maxSlots, int times)
|
||||
{
|
||||
Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots);
|
||||
Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
|
||||
Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.MAX_SLOTS, maxSlots);
|
||||
}
|
||||
|
||||
private void verifyState(Map<String, DateTime> expectedDatasourceToLastKillIntervalEnd)
|
||||
{
|
||||
Assert.assertEquals(expectedDatasourceToLastKillIntervalEnd, target.getDatasourceToLastKillIntervalEnd());
|
||||
}
|
||||
|
||||
private void runAndVerifyNoKill()
|
||||
|
@ -366,7 +473,7 @@ public class KillUnusedSegmentsTest
|
|||
private DataSegment createSegmentWithEnd(DateTime endTime)
|
||||
{
|
||||
return new DataSegment(
|
||||
"DS1",
|
||||
DATASOURCE,
|
||||
new Interval(Period.days(1), endTime),
|
||||
DateTimes.nowUtc().toString(),
|
||||
new HashMap<>(),
|
||||
|
|
|
@ -199,7 +199,13 @@ public class TestSegmentsMetadataManager implements SegmentsMetadataManager
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Interval> getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit, DateTime maxUsedFlagLastUpdatedTime)
|
||||
public List<Interval> getUnusedSegmentIntervals(
|
||||
final String dataSource,
|
||||
@Nullable final DateTime minStartTime,
|
||||
final DateTime maxEndTime,
|
||||
final int limit,
|
||||
final DateTime maxUsedFlagLastUpdatedTime
|
||||
)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue