mirror of https://github.com/apache/druid.git
optionally enable coordinator auto kill tasks on all dataSources via dynamic config (#3250)
This commit is contained in:
parent
7995818220
commit
3f82108d15
|
@ -28,7 +28,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|
|||
|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false|
|
||||
|`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false|
|
||||
|`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|PT15M|
|
||||
|`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all the whitelisted dataSources, coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|false|
|
||||
|`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|false|
|
||||
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|
||||
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|
||||
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|
||||
|
@ -88,6 +88,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|
|||
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|
||||
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|
||||
|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none|
|
||||
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|
||||
|
||||
To view the audit history of coordinator dynamic config issue a GET request to the URL -
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.server.coordinator;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.common.IAE;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
|
@ -38,6 +39,7 @@ public class CoordinatorDynamicConfig
|
|||
private final int replicationThrottleLimit;
|
||||
private final int balancerComputeThreads;
|
||||
private final boolean emitBalancingStats;
|
||||
private final boolean killAllDataSources;
|
||||
private final Set<String> killDataSourceWhitelist;
|
||||
|
||||
@JsonCreator
|
||||
|
@ -54,7 +56,8 @@ public class CoordinatorDynamicConfig
|
|||
// Type is Object here so that we can support both string and list as
|
||||
// coordinator console can not send array of strings in the update request.
|
||||
// See https://github.com/druid-io/druid/issues/3055
|
||||
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist
|
||||
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
|
||||
@JsonProperty("killAllDataSources") boolean killAllDataSources
|
||||
)
|
||||
{
|
||||
this.maxSegmentsToMove = maxSegmentsToMove;
|
||||
|
@ -66,6 +69,8 @@ public class CoordinatorDynamicConfig
|
|||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
|
||||
|
||||
this.killAllDataSources = killAllDataSources;
|
||||
|
||||
if (killDataSourceWhitelist instanceof String) {
|
||||
String[] tmp = ((String) killDataSourceWhitelist).split(",");
|
||||
this.killDataSourceWhitelist = new HashSet<>();
|
||||
|
@ -80,6 +85,10 @@ public class CoordinatorDynamicConfig
|
|||
} else {
|
||||
this.killDataSourceWhitelist = ImmutableSet.of();
|
||||
}
|
||||
|
||||
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
|
||||
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
|
||||
}
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -136,6 +145,12 @@ public class CoordinatorDynamicConfig
|
|||
return killDataSourceWhitelist;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isKillAllDataSources()
|
||||
{
|
||||
return killAllDataSources;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -149,6 +164,7 @@ public class CoordinatorDynamicConfig
|
|||
", balancerComputeThreads=" + balancerComputeThreads +
|
||||
", emitBalancingStats=" + emitBalancingStats +
|
||||
", killDataSourceWhitelist=" + killDataSourceWhitelist +
|
||||
", killAllDataSources=" + killAllDataSources +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -188,6 +204,9 @@ public class CoordinatorDynamicConfig
|
|||
if (emitBalancingStats != that.emitBalancingStats) {
|
||||
return false;
|
||||
}
|
||||
if (killAllDataSources != that.killAllDataSources) {
|
||||
return false;
|
||||
}
|
||||
return !(killDataSourceWhitelist != null
|
||||
? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist)
|
||||
: that.killDataSourceWhitelist != null);
|
||||
|
@ -205,6 +224,7 @@ public class CoordinatorDynamicConfig
|
|||
result = 31 * result + replicationThrottleLimit;
|
||||
result = 31 * result + balancerComputeThreads;
|
||||
result = 31 * result + (emitBalancingStats ? 1 : 0);
|
||||
result = 31 * result + (killAllDataSources ? 1 : 0);
|
||||
result = 31 * result + (killDataSourceWhitelist != null ? killDataSourceWhitelist.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
@ -220,10 +240,11 @@ public class CoordinatorDynamicConfig
|
|||
private boolean emitBalancingStats;
|
||||
private int balancerComputeThreads;
|
||||
private Set<String> killDataSourceWhitelist;
|
||||
private boolean killAllDataSources;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null);
|
||||
this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, false);
|
||||
}
|
||||
|
||||
private Builder(
|
||||
|
@ -235,7 +256,8 @@ public class CoordinatorDynamicConfig
|
|||
int replicationThrottleLimit,
|
||||
int balancerComputeThreads,
|
||||
boolean emitBalancingStats,
|
||||
Set<String> killDataSourceWhitelist
|
||||
Set<String> killDataSourceWhitelist,
|
||||
boolean killAllDataSources
|
||||
)
|
||||
{
|
||||
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
|
||||
|
@ -247,6 +269,7 @@ public class CoordinatorDynamicConfig
|
|||
this.emitBalancingStats = emitBalancingStats;
|
||||
this.balancerComputeThreads = balancerComputeThreads;
|
||||
this.killDataSourceWhitelist = killDataSourceWhitelist;
|
||||
this.killAllDataSources = killAllDataSources;
|
||||
}
|
||||
|
||||
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
|
||||
|
@ -308,7 +331,8 @@ public class CoordinatorDynamicConfig
|
|||
replicationThrottleLimit,
|
||||
balancerComputeThreads,
|
||||
emitBalancingStats,
|
||||
killDataSourceWhitelist
|
||||
killDataSourceWhitelist,
|
||||
killAllDataSources
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ import io.druid.server.coordinator.DruidCoordinatorConfig;
|
|||
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -81,7 +81,17 @@ public class DruidCoordinatorSegmentKiller implements DruidCoordinatorHelper
|
|||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
Set<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
boolean killAllDataSources = params.getCoordinatorDynamicConfig().isKillAllDataSources();
|
||||
Collection<String> whitelist = params.getCoordinatorDynamicConfig().getKillDataSourceWhitelist();
|
||||
|
||||
if (killAllDataSources && whitelist != null && !whitelist.isEmpty()) {
|
||||
log.error("killAllDataSources can't be true when killDataSourceWhitelist is non-empty, No kill tasks are scheduled.");
|
||||
return params;
|
||||
}
|
||||
|
||||
if (killAllDataSources) {
|
||||
whitelist = segmentManager.getAllDatasourceNames();
|
||||
}
|
||||
|
||||
if (whitelist != null && whitelist.size() > 0 && (lastKillTime + period) < System.currentTimeMillis()) {
|
||||
lastKillTime = System.currentTimeMillis();
|
||||
|
|
|
@ -508,7 +508,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
|
||||
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
|
||||
new CoordinatorDynamicConfig(
|
||||
0, 0, 0, 0, 1, 24, 0, false, null
|
||||
0, 0, 0, 0, 1, 24, 0, false, null, false
|
||||
)
|
||||
).anyTimes();
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
|
@ -1031,7 +1031,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
{
|
||||
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
|
||||
new CoordinatorDynamicConfig(
|
||||
0, 0, 0, 0, 1, 7, 0, false, null
|
||||
0, 0, 0, 0, 1, 7, 0, false, null, false
|
||||
)
|
||||
).atLeastOnce();
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
|
@ -1212,7 +1212,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
{
|
||||
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
|
||||
new CoordinatorDynamicConfig(
|
||||
0, 0, 0, 0, 1, 24, 0, false, null
|
||||
0, 0, 0, 0, 1, 24, 0, false, null, false
|
||||
)
|
||||
).anyTimes();
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
|
||||
package io.druid.server.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.common.IAE;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
import org.junit.Assert;
|
||||
|
@ -57,7 +59,7 @@ public class CoordinatorDynamicConfigTest
|
|||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2")),
|
||||
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
|
||||
actual
|
||||
);
|
||||
}
|
||||
|
@ -89,16 +91,66 @@ public class CoordinatorDynamicConfigTest
|
|||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2")),
|
||||
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false),
|
||||
actual
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerdeWithKillAllDataSources() throws Exception
|
||||
{
|
||||
String jsonStr = "{\n"
|
||||
+ " \"millisToWaitBeforeDeleting\": 1,\n"
|
||||
+ " \"mergeBytesLimit\": 1,\n"
|
||||
+ " \"mergeSegmentsLimit\" : 1,\n"
|
||||
+ " \"maxSegmentsToMove\": 1,\n"
|
||||
+ " \"replicantLifetime\": 1,\n"
|
||||
+ " \"replicationThrottleLimit\": 1,\n"
|
||||
+ " \"balancerComputeThreads\": 2, \n"
|
||||
+ " \"emitBalancingStats\": true,\n"
|
||||
+ " \"killAllDataSources\": true\n"
|
||||
+ "}\n";
|
||||
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
CoordinatorDynamicConfig actual = mapper.readValue(
|
||||
mapper.writeValueAsString(
|
||||
mapper.readValue(
|
||||
jsonStr,
|
||||
CoordinatorDynamicConfig.class
|
||||
)
|
||||
),
|
||||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true),
|
||||
actual
|
||||
);
|
||||
|
||||
|
||||
|
||||
//ensure whitelist is empty when killAllDataSources is true
|
||||
try {
|
||||
jsonStr = "{\n"
|
||||
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
|
||||
+ " \"killAllDataSources\": true\n"
|
||||
+ "}\n";
|
||||
mapper.readValue(
|
||||
jsonStr,
|
||||
CoordinatorDynamicConfig.class
|
||||
);
|
||||
|
||||
Assert.fail("deserialization should fail.");
|
||||
} catch (JsonMappingException e) {
|
||||
Assert.assertTrue(e.getCause() instanceof IAE);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderDefaults()
|
||||
{
|
||||
Assert.assertEquals(
|
||||
new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null),
|
||||
new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false),
|
||||
new CoordinatorDynamicConfig.Builder().build()
|
||||
);
|
||||
}
|
||||
|
@ -106,8 +158,8 @@ public class CoordinatorDynamicConfigTest
|
|||
@Test
|
||||
public void testEqualsAndHashCodeSanity()
|
||||
{
|
||||
CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null);
|
||||
CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null);
|
||||
CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);
|
||||
CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, false);
|
||||
|
||||
Assert.assertEquals(config1, config2);
|
||||
Assert.assertEquals(config1.hashCode(), config2.hashCode());
|
||||
|
|
Loading…
Reference in New Issue