Enable auto kill segments by default (#12187)

* Enable auto-kill by default

* tests

* wip

* test

* fix IT

* fix it

* remove from docs

* make coverage bot happy
This commit is contained in:
Suneet Saldanha 2022-02-07 06:57:54 -08:00 committed by GitHub
parent 2b8e7fc0b4
commit ced1389d4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 269 additions and 161 deletions

View File

@ -796,10 +796,10 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesnt allow the Coordinator to know for a fact that its done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S|
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M|
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|false|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|true|
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|`P90D`|
|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`|
|`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon, which manages the load and drop of segments.|PT0.050S (50 ms)|
@ -902,7 +902,6 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100|
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|

View File

@ -75,13 +75,13 @@ If you want to skip the details, check out the [example](#example) for configuri
Segment records and segments in deep storage become eligible for deletion when both of the following conditions hold:
- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` and `killAllDataSources` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration).
- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration).
- When the `durationToRetain` time has passed since their creation.
Kill tasks use the following configuration:
- `druid.coordinator.kill.on`: When `true`, enables the Coordinator to submit a kill task for unused segments, which deletes them completely from metadata store and from deep storage.
Only applies to the specified datasources in the dynamic configuration parameter `killDataSourceWhitelist`.
If `killDataSourceWhitelist` is not set or empty, `killAllDataSources` defaults to true so that kill tasks can be submitted for all datasources.
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task.
@ -189,8 +189,8 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H
# Set a kill task to poll every day to delete Segment records and segments
# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
# you must set either killAllDataSources or killDataSourceWhitelist in the dynamic
# configuration. For this example, assume killAllDataSources is set to true.
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.
druid.coordinator.kill.on=true

View File

@ -40,4 +40,6 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator
druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"]
druid_server_https_crlPath=/tls/revocations.crl
druid_coordinator_period_indexingPeriod=PT180000S
# 2x indexing period so that kill period is valid
druid_coordinator_kill_period=PT360000S
druid_coordinator_period=PT1S

View File

@ -20,11 +20,11 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
@ -62,11 +62,6 @@ public class CoordinatorDynamicConfig
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
/**
* If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources.
*/
private final boolean killUnusedSegmentsInAllDataSources;
/**
* List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}.
*/
@ -129,9 +124,6 @@ public class CoordinatorDynamicConfig
// Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn,
// Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killAllDataSources") boolean killUnusedSegmentsInAllDataSources,
// Type is Object here so that we can support both string and list as Coordinator console can not send array of
// strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn.
// Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
@ -172,7 +164,6 @@ public class CoordinatorDynamicConfig
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
this.emitBalancingStats = emitBalancingStats;
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
this.dataSourcesToNotKillStalePendingSegmentsIn =
parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
@ -186,11 +177,6 @@ public class CoordinatorDynamicConfig
);
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
if (this.killUnusedSegmentsInAllDataSources && !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) {
throw new IAE(
"can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn"
);
}
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
@ -312,10 +298,10 @@ public class CoordinatorDynamicConfig
return specificDataSourcesToKillUnusedSegmentsIn;
}
@JsonProperty("killAllDataSources")
@JsonIgnore
public boolean isKillUnusedSegmentsInAllDataSources()
{
return killUnusedSegmentsInAllDataSources;
return specificDataSourcesToKillUnusedSegmentsIn.isEmpty();
}
@JsonProperty("killPendingSegmentsSkipList")
@ -398,7 +384,6 @@ public class CoordinatorDynamicConfig
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
", emitBalancingStats=" + emitBalancingStats +
", killUnusedSegmentsInAllDataSources=" + killUnusedSegmentsInAllDataSources +
", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn +
", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
@ -453,9 +438,6 @@ public class CoordinatorDynamicConfig
if (emitBalancingStats != that.emitBalancingStats) {
return false;
}
if (killUnusedSegmentsInAllDataSources != that.killUnusedSegmentsInAllDataSources) {
return false;
}
if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
return false;
}
@ -494,7 +476,6 @@ public class CoordinatorDynamicConfig
replicationThrottleLimit,
balancerComputeThreads,
emitBalancingStats,
killUnusedSegmentsInAllDataSources,
maxSegmentsInNodeLoadingQueue,
specificDataSourcesToKillUnusedSegmentsIn,
dataSourcesToNotKillStalePendingSegmentsIn,
@ -523,7 +504,6 @@ public class CoordinatorDynamicConfig
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
@ -541,7 +521,6 @@ public class CoordinatorDynamicConfig
private Boolean emitBalancingStats;
private Integer balancerComputeThreads;
private Object specificDataSourcesToKillUnusedSegmentsIn;
private Boolean killUnusedSegmentsInAllDataSources;
private Object dataSourcesToNotKillStalePendingSegmentsIn;
private Integer maxSegmentsInNodeLoadingQueue;
private Object decommissioningNodes;
@ -568,7 +547,6 @@ public class CoordinatorDynamicConfig
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@JsonProperty("emitBalancingStats") @Nullable Boolean emitBalancingStats,
@JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn,
@JsonProperty("killAllDataSources") @Nullable Boolean killUnusedSegmentsInAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
@ -591,7 +569,6 @@ public class CoordinatorDynamicConfig
this.balancerComputeThreads = balancerComputeThreads;
this.emitBalancingStats = emitBalancingStats;
this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn;
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = decommissioningNodes;
@ -668,12 +645,6 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withKillUnusedSegmentsInAllDataSources(boolean killUnusedSegmentsInAllDataSources)
{
this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources;
return this;
}
public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue)
{
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
@ -727,9 +698,6 @@ public class CoordinatorDynamicConfig
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats,
specificDataSourcesToKillUnusedSegmentsIn,
killUnusedSegmentsInAllDataSources == null
? DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES
: killUnusedSegmentsInAllDataSources,
dataSourcesToNotKillStalePendingSegmentsIn,
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
@ -765,9 +733,6 @@ public class CoordinatorDynamicConfig
specificDataSourcesToKillUnusedSegmentsIn == null
? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
: specificDataSourcesToKillUnusedSegmentsIn,
killUnusedSegmentsInAllDataSources == null
? defaults.isKillUnusedSegmentsInAllDataSources()
: killUnusedSegmentsInAllDataSources,
dataSourcesToNotKillStalePendingSegmentsIn == null
? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
: dataSourcesToNotKillStalePendingSegmentsIn,

View File

@ -48,11 +48,11 @@ public abstract class DruidCoordinatorConfig
public abstract Duration getCoordinatorKillPeriod();
@Config("druid.coordinator.kill.durationToRetain")
@Default("PT-1s")
@Default("P90D")
public abstract Duration getCoordinatorKillDurationToRetain();
@Config("druid.coordinator.kill.maxSegments")
@Default("0")
@Default("100")
public abstract int getCoordinatorKillMaxSegments();
@Config("druid.coordinator.kill.supervisor.period")

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -86,19 +87,10 @@ public class KillUnusedSegments implements CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
boolean killAllDataSources = params.getCoordinatorDynamicConfig().isKillUnusedSegmentsInAllDataSources();
Collection<String> specificDataSourcesToKill =
Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
if (killAllDataSources && specificDataSourcesToKill != null && !specificDataSourcesToKill.isEmpty()) {
log.error(
"killAllDataSources can't be true when specificDataSourcesToKill is non-empty. No kill tasks are scheduled."
);
return params;
}
Collection<String> dataSourcesToKill = specificDataSourcesToKill;
if (killAllDataSources) {
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}

View File

@ -43,8 +43,8 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod());
Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod());
Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis());
Assert.assertEquals(-1000, config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(0, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
Assert.assertTrue(config.getCompactionSkipLockedIntervals());

View File

@ -1487,7 +1487,6 @@ public class RunRulesTest
.withBalancerComputeThreads(0)
.withEmitBalancingStats(false)
.withSpecificDataSourcesToKillUnusedSegmentsIn(null)
.withKillUnusedSegmentsInAllDataSources(false)
.withMaxSegmentsInNodeLoadingQueue(1000)
.withPauseCoordination(false)
.build();

View File

@ -20,112 +20,187 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
/**
*/
@RunWith(Enclosed.class)
public class KillUnusedSegmentsTest
{
@Test
public void testFindIntervalForKill()
/**
* Standing up new tests with mocks was easier than trying to move the existing tests to use mocks for consistency.
* In the future, if all tests are moved to use the same structure, this inner static class can be gotten rid of.
*/
@RunWith(MockitoJUnitRunner.class)
public static class MockedTest
{
testFindIntervalForKill(null, null);
testFindIntervalForKill(ImmutableList.of(), null);
private static final Set<String> ALL_DATASOURCES = ImmutableSet.of("DS1", "DS2", "DS3");
private static final int MAX_SEGMENTS_TO_KILL = 10;
private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2);
private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1);
private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1);
testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015"));
@Mock
private SegmentsMetadataManager segmentsMetadataManager;
@Mock
private IndexingServiceClient indexingServiceClient;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private DruidCoordinatorConfig config;
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")),
Intervals.of("2014/2017")
);
@Mock
private DruidCoordinatorRuntimeParams params;
@Mock
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private KillUnusedSegments target;
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")),
Intervals.of("2014/2016")
);
@Before
public void setup()
{
Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig();
Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames();
Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod();
Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain();
Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod();
Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments();
target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config);
}
@Test
public void testRunWihNoIntervalShouldNotKillAnySegments()
{
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")),
Intervals.of("2014/2016")
);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")),
Intervals.of("2014/2017")
);
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020")
),
Intervals.of("2014/2020")
);
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020"),
Intervals.of("2021/2022")
),
Intervals.of("2014/2022")
);
@Test
public void testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments()
{
Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1"));
target.run(params);
Mockito.verify(indexingServiceClient, Mockito.never())
.killUnusedSegments(anyString(), anyString(), any(Interval.class));
}
}
private void testFindIntervalForKill(List<Interval> segmentIntervals, Interval expected)
public static class FindIntervalsTest
{
SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class);
EasyMock.expect(
segmentsMetadataManager.getUnusedSegmentIntervals(
EasyMock.anyString(),
EasyMock.anyObject(DateTime.class),
EasyMock.anyInt()
)
).andReturn(segmentIntervals);
EasyMock.replay(segmentsMetadataManager);
IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class);
@Test
public void testFindIntervalForKill()
{
testFindIntervalForKill(null, null);
testFindIntervalForKill(ImmutableList.of(), null);
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
segmentsMetadataManager,
indexingServiceClient,
new TestDruidCoordinatorConfig(
null,
null,
Duration.parse("PT76400S"),
null,
new Duration(1),
Duration.parse("PT86400S"),
Duration.parse("PT86400S"),
null,
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO
)
);
testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015"));
Assert.assertEquals(
expected,
unusedSegmentsKiller.findIntervalForKill("test", 10000)
);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")),
Intervals.of("2014/2017")
);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")),
Intervals.of("2014/2016")
);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")),
Intervals.of("2014/2016")
);
testFindIntervalForKill(
ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")),
Intervals.of("2014/2017")
);
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020")
),
Intervals.of("2014/2020")
);
testFindIntervalForKill(
ImmutableList.of(
Intervals.of("2015/2019"),
Intervals.of("2014/2016"),
Intervals.of("2018/2020"),
Intervals.of("2021/2022")
),
Intervals.of("2014/2022")
);
}
private void testFindIntervalForKill(List<Interval> segmentIntervals, Interval expected)
{
SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class);
EasyMock.expect(
segmentsMetadataManager.getUnusedSegmentIntervals(
EasyMock.anyString(),
EasyMock.anyObject(DateTime.class),
EasyMock.anyInt()
)
).andReturn(segmentIntervals);
EasyMock.replay(segmentsMetadataManager);
IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class);
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
segmentsMetadataManager,
indexingServiceClient,
new TestDruidCoordinatorConfig(
null,
null,
Duration.parse("PT76400S"),
null,
new Duration(1),
Duration.parse("PT86400S"),
Duration.parse("PT86400S"),
null,
null,
null,
null,
null,
null,
null,
null,
null,
1000,
Duration.ZERO
)
);
Assert.assertEquals(
expected,
unusedSegmentsKiller.findIntervalForKill("test", 10000)
);
}
}
}

View File

@ -22,7 +22,6 @@ package org.apache.druid.server.http;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.junit.Assert;
@ -224,7 +223,56 @@ public class CoordinatorDynamicConfigTest
true,
10
);
}
@Test
public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources()
{
CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
1,
1,
1,
null,
false,
1,
2,
10,
true,
null,
null,
null,
ImmutableSet.of("host1"),
5,
true,
true,
10);
Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources());
Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
}
@Test
public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources()
{
CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1,
1,
1,
1,
null,
false,
1,
2,
10,
true,
ImmutableSet.of("test1"),
null,
null,
ImmutableSet.of("host1"),
5,
true,
true,
10);
Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources());
Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn());
}
@Test
@ -534,23 +582,21 @@ public class CoordinatorDynamicConfigTest
Integer.MAX_VALUE
);
//ensure whitelist is empty when killAllDataSources is true
try {
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1\n"
+ "}\n";
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
);
// killAllDataSources is a config in versions 0.22.x and older and is no longer used.
// This used to be an invalid config, but as of 0.23.0 the killAllDataSources flag no longer exsist,
// so this is a valid config
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1\n"
+ "}\n";
actual = mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
}
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IAE);
}
Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources());
Assert.assertEquals(2, actual.getSpecificDataSourcesToKillUnusedSegmentsIn().size());
}
@Test
@ -618,7 +664,7 @@ public class CoordinatorDynamicConfigTest
1,
false,
emptyList,
false,
true,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
emptyList,
70,
@ -628,6 +674,36 @@ public class CoordinatorDynamicConfigTest
);
}
@Test
public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpecified()
{
CoordinatorDynamicConfig defaultConfig =
CoordinatorDynamicConfig.builder()
.withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("DATASOURCE"))
.build();
CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig);
assertConfig(
config,
900000,
524288000,
100,
5,
100,
15,
10,
1,
false,
ImmutableSet.of("DATASOURCE"),
false,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
ImmutableSet.of(),
70,
false,
false,
Integer.MAX_VALUE
);
}
@Test
public void testUpdate()
{
@ -656,7 +732,6 @@ public class CoordinatorDynamicConfigTest
null,
null,
null,
null,
null
).build(current)
);

View File

@ -264,6 +264,7 @@ public class CliCoordinator extends ServerRunnable
}
conditionalIndexingServiceDutyMultibind.addConditionBinding(
"druid.coordinator.kill.on",
"true",
Predicates.equalTo("true"),
KillUnusedSegments.class
);