Set default `KillUnusedSegments` duty to coordinator's indexing period & `killTaskSlotRatio` to 0.1 (#16247)

The default value for druid.coordinator.kill.period (if unspecified) has changed from P1D to the value of druid.coordinator.period.indexingPeriod. Operators can choose to override druid.coordinator.kill.period and that will take precedence over the default behavior.
The default value for the coordinator dynamic config killTaskSlotRatio is updated from 1.0 to 0.1. This ensures that that kill tasks take up only 1 task slot right out-of-the-box instead of taking up all the task slots.

* Remove stale comment and inline canDutyRun()

* druid.coordinator.kill.period defaults to druid.coordinator.period.indexingPeriod if not set.

- Remove the default P1D value for druid.coordinator.kill.period. Instead default
  druid.coordinator.kill.period to whatever value druid.coordinator.period.indexingPeriod is set
  to if the former config isn't specified.
- If druid.coordinator.kill.period is set, the value will take precedence over
  druid.coordinator.period.indexingPeriod

* Update server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java

* Fix checkstyle error

* Clarify comment

* Update server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java

* Put back canDutyRun()

* Default killTaskSlotsRatio to 0.1 instead of 1.0 (all slots)

* Fix typo DEFAULT_MAX_COMPACTION_TASK_SLOTS

* Remove unused test method.

* Update default value of killTaskSlotsRatio in docs and web-console default mock

* Move initDuty() after params and config setup.
This commit is contained in:
Abhishek Radhakrishnan 2024-04-14 18:56:17 -07:00 committed by GitHub
parent b0c5184f9d
commit 041d0bff5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 79 additions and 89 deletions

View File

@ -868,7 +868,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`| |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`|
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all datasources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all datasources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted datasources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.<br /><br />When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted datasources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.<br /><br />When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false|
|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 day)| |`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|Same as `druid.coordinator.period.indexingPeriod`|
|`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.<br /><br />Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`| |`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.<br /><br />Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`|
|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false|
|`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`| |`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`|
@ -942,7 +942,7 @@ The following table shows the dynamic configuration properties for the Coordinat
|`replicationThrottleLimit`|The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents Historical services from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster.|500| |`replicationThrottleLimit`|The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents Historical services from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster.|500|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall.|`num_cores` / 2| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall.|`num_cores` / 2|
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none| |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killTaskSlotRatio`|Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.| 1 - all task slots can be used| |`killTaskSlotRatio`|Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This value must be between 0 and 1. Only applicable for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.|0.1|
|`maxKillTaskSlots`|Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.|`Integer.MAX_VALUE` - no limit| |`maxKillTaskSlots`|Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true.|`Integer.MAX_VALUE` - no limit|
|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none| |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes.|500| |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes.|500|

View File

@ -84,7 +84,7 @@ Kill tasks use the following configuration:
- `druid.coordinator.kill.on`: When `true`, enables the Coordinator to submit a kill task for unused segments, which deletes them completely from metadata store and from deep storage. - `druid.coordinator.kill.on`: When `true`, enables the Coordinator to submit a kill task for unused segments, which deletes them completely from metadata store and from deep storage.
Only applies to the specified datasources in the dynamic configuration parameter `killDataSourceWhitelist`. Only applies to the specified datasources in the dynamic configuration parameter `killDataSourceWhitelist`.
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources. If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`. - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `druid.coordinator.period.indexingPeriod`. Must be greater than or equal to `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion. - `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed. - `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed.
- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused. - `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused.

View File

@ -32,7 +32,7 @@ public class CoordinatorCompactionConfig
public static final String CONFIG_KEY = "coordinator.compaction.config"; public static final String CONFIG_KEY = "coordinator.compaction.config";
private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1; private static final double DEFAULT_COMPACTION_TASK_RATIO = 0.1;
private static final int DEFAILT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE; private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false; private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
private final List<DataSourceCompactionConfig> compactionConfigs; private final List<DataSourceCompactionConfig> compactionConfigs;
@ -91,7 +91,7 @@ public class CoordinatorCompactionConfig
DEFAULT_COMPACTION_TASK_RATIO : DEFAULT_COMPACTION_TASK_RATIO :
compactionTaskSlotRatio; compactionTaskSlotRatio;
this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ? this.maxCompactionTaskSlots = maxCompactionTaskSlots == null ?
DEFAILT_MAX_COMPACTION_TASK_SLOTS : DEFAULT_MAX_COMPACTION_TASK_SLOTS :
maxCompactionTaskSlots; maxCompactionTaskSlots;
this.useAutoScaleSlots = useAutoScaleSlots == null ? this.useAutoScaleSlots = useAutoScaleSlots == null ?
DEFAULT_USE_AUTO_SCALE_SLOTS : DEFAULT_USE_AUTO_SCALE_SLOTS :

View File

@ -143,14 +143,14 @@ public class CoordinatorDynamicConfig
killTaskSlotRatio killTaskSlotRatio
); );
} }
this.killTaskSlotRatio = killTaskSlotRatio != null ? killTaskSlotRatio : Defaults.KILL_TASK_SLOT_RATIO; this.killTaskSlotRatio = Builder.valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO);
if (null != maxKillTaskSlots && maxKillTaskSlots < 0) { if (null != maxKillTaskSlots && maxKillTaskSlots < 0) {
throw InvalidInput.exception( throw InvalidInput.exception(
"maxKillTaskSlots [%d] is invalid. It must be >= 0.", "maxKillTaskSlots [%d] is invalid. It must be >= 0.",
maxKillTaskSlots maxKillTaskSlots
); );
} }
this.maxKillTaskSlots = maxKillTaskSlots != null ? maxKillTaskSlots : Defaults.MAX_KILL_TASK_SLOTS; this.maxKillTaskSlots = Builder.valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS);
this.dataSourcesToNotKillStalePendingSegmentsIn this.dataSourcesToNotKillStalePendingSegmentsIn
= parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
this.maxSegmentsInNodeLoadingQueue = Builder.valueOrDefault( this.maxSegmentsInNodeLoadingQueue = Builder.valueOrDefault(
@ -440,11 +440,7 @@ public class CoordinatorDynamicConfig
static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true; static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true;
static final boolean SMART_SEGMENT_LOADING = true; static final boolean SMART_SEGMENT_LOADING = true;
// The following default values for killTaskSlotRatio and maxKillTaskSlots static final double KILL_TASK_SLOT_RATIO = 0.1;
// are to preserve the behavior before Druid 0.28 and a future version may
// want to consider better defaults so that kill tasks can not eat up all
// the capacity in the cluster would be nice
static final double KILL_TASK_SLOT_RATIO = 1.0;
static final int MAX_KILL_TASK_SLOTS = Integer.MAX_VALUE; static final int MAX_KILL_TASK_SLOTS = Integer.MAX_VALUE;
} }

View File

@ -23,8 +23,6 @@ import org.joda.time.Duration;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
/**
*/
public abstract class DruidCoordinatorConfig public abstract class DruidCoordinatorConfig
{ {
@Config("druid.coordinator.startDelay") @Config("druid.coordinator.startDelay")
@ -47,9 +45,15 @@ public abstract class DruidCoordinatorConfig
@Default("false") @Default("false")
public abstract boolean isKillUnusedSegmentsEnabled(); public abstract boolean isKillUnusedSegmentsEnabled();
/**
* @return The period at which the coordinator cleans up unused segments. If the config isn't explicitly set,
* it defaults to the coordinator's indexing period.
*/
@Config("druid.coordinator.kill.period") @Config("druid.coordinator.kill.period")
@Default("P1D") public Duration getCoordinatorKillPeriod()
public abstract Duration getCoordinatorKillPeriod(); {
return getCoordinatorIndexingPeriod();
}
@Config("druid.coordinator.kill.durationToRetain") @Config("druid.coordinator.kill.durationToRetain")
@Default("P90D") @Default("P90D")

View File

@ -245,17 +245,6 @@ public class KillUnusedSegments implements CoordinatorDuty
stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks); stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
} }
/**
* <p>
* Calculates the interval for which segments are to be killed in a datasource.
* Since this method compares datetime as strings, it cannot find unused segments that are outside
* the range [{@link DateTimes#COMPARE_DATE_AS_STRING_MIN}, {@link DateTimes#COMPARE_DATE_AS_STRING_MAX}),
* such as {@link org.apache.druid.java.util.common.granularity.Granularities#ALL} partitioned segments
* and segments that end in {@link DateTimes#MAX}.
*</p><p>
* For more information, see <a href="https://github.com/apache/druid/issues/15951"> Issue#15951</a>.
* </p>
*/
@Nullable @Nullable
private Interval findIntervalForKill( private Interval findIntervalForKill(
final String dataSource, final String dataSource,

View File

@ -27,35 +27,34 @@ import org.skife.config.ConfigurationObjectFactory;
import java.util.Properties; import java.util.Properties;
/**
*/
public class DruidCoordinatorConfigTest public class DruidCoordinatorConfigTest
{ {
@Test @Test
public void testDeserialization() public void testCoordinatorConfigWithDefaults()
{ {
ConfigurationObjectFactory factory = Config.createFactory(new Properties()); final ConfigurationObjectFactory factory = Config.createFactory(new Properties());
final DruidCoordinatorConfig config = factory.build(DruidCoordinatorConfig.class);
//with defaults
DruidCoordinatorConfig config = factory.build(DruidCoordinatorConfig.class);
Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay());
Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod());
Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod());
Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorKillPeriod());
Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); Assert.assertEquals(new Duration("PT7776000s"), config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(100, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain()); Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
Assert.assertEquals("http", config.getLoadQueuePeonType()); Assert.assertEquals("http", config.getLoadQueuePeonType());
}
//with non-defaults @Test
Properties props = new Properties(); public void testCoordinatorConfigWithOverrides()
{
final Properties props = new Properties();
props.setProperty("druid.coordinator.startDelay", "PT1s"); props.setProperty("druid.coordinator.startDelay", "PT1s");
props.setProperty("druid.coordinator.period", "PT1s"); props.setProperty("druid.coordinator.period", "PT1s");
props.setProperty("druid.coordinator.period.indexingPeriod", "PT1s"); props.setProperty("druid.coordinator.period.indexingPeriod", "PT1s");
props.setProperty("druid.coordinator.kill.on", "true"); props.setProperty("druid.coordinator.kill.on", "true");
props.setProperty("druid.coordinator.kill.period", "PT1s"); props.setProperty("druid.coordinator.kill.period", "PT10s");
props.setProperty("druid.coordinator.kill.durationToRetain", "PT1s"); props.setProperty("druid.coordinator.kill.durationToRetain", "PT1s");
props.setProperty("druid.coordinator.kill.maxSegments", "10000"); props.setProperty("druid.coordinator.kill.maxSegments", "10000");
props.setProperty("druid.coordinator.kill.pendingSegments.on", "true"); props.setProperty("druid.coordinator.kill.pendingSegments.on", "true");
@ -63,23 +62,28 @@ public class DruidCoordinatorConfigTest
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s"); props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true"); props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true");
factory = Config.createFactory(props); final ConfigurationObjectFactory factory = Config.createFactory(props);
config = factory.build(DruidCoordinatorConfig.class); final DruidCoordinatorConfig config = factory.build(DruidCoordinatorConfig.class);
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorStartDelay());
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorPeriod());
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorIndexingPeriod()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorIndexingPeriod());
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillPeriod()); Assert.assertEquals(new Duration("PT10s"), config.getCoordinatorKillPeriod());
Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain());
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay()); Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain()); Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
}
// Test negative druid.coordinator.kill.durationToRetain now that it is valid. @Test
props = new Properties(); public void testCoordinatorConfigWithNegativeDurationToRetain()
{
final Properties props = new Properties();
props.setProperty("druid.coordinator.kill.durationToRetain", "PT-1s"); props.setProperty("druid.coordinator.kill.durationToRetain", "PT-1s");
factory = Config.createFactory(props);
config = factory.build(DruidCoordinatorConfig.class); final ConfigurationObjectFactory factory = Config.createFactory(props);
final DruidCoordinatorConfig config = factory.build(DruidCoordinatorConfig.class);
Assert.assertEquals(new Duration("PT-1s"), config.getCoordinatorKillDurationToRetain()); Assert.assertEquals(new Duration("PT-1s"), config.getCoordinatorKillDurationToRetain());
} }
} }

View File

@ -294,7 +294,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
private static final Duration DEFAULT_COORDINATOR_PERIOD = Period.parse("PT60s").toStandardDuration(); private static final Duration DEFAULT_COORDINATOR_PERIOD = Period.parse("PT60s").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = Period.parse("PT1800s").toStandardDuration(); private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = Period.parse("PT1800s").toStandardDuration();
private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = Period.parse("PT1H").toStandardDuration(); private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = Period.parse("PT1H").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = Period.parse("P1D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration(); private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN = Period.parse("P90D").toStandardDuration();
private static final Duration DEFAULT_COORDINATOR_KILL_BUFFER_PERIOD = Period.parse("P30D").toStandardDuration(); private static final Duration DEFAULT_COORDINATOR_KILL_BUFFER_PERIOD = Period.parse("P30D").toStandardDuration();
private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false; private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false;
@ -497,7 +496,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod, coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod,
metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod, metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod,
loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay, loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay,
coordinatorKillPeriod == null ? DEFAULT_COORDINATOR_KILL_PERIOD : coordinatorKillPeriod, coordinatorKillPeriod == null ? (coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod)
: coordinatorKillPeriod,
coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAIN
: coordinatorKillDurationToRetain, : coordinatorKillDurationToRetain,
coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod, coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod,

View File

@ -90,7 +90,7 @@ public class KillUnusedSegmentsTest
private static final String VERSION = "v1"; private static final String VERSION = "v1";
private final CoordinatorDynamicConfig.Builder dynamicConfigBuilder = CoordinatorDynamicConfig.builder(); private CoordinatorDynamicConfig.Builder dynamicConfigBuilder;
private TestOverlordClient overlordClient; private TestOverlordClient overlordClient;
private TestDruidCoordinatorConfig.Builder configBuilder; private TestDruidCoordinatorConfig.Builder configBuilder;
private DruidCoordinatorRuntimeParams.Builder paramsBuilder; private DruidCoordinatorRuntimeParams.Builder paramsBuilder;
@ -126,10 +126,11 @@ public class KillUnusedSegmentsTest
overlordClient = new TestOverlordClient(); overlordClient = new TestOverlordClient();
configBuilder = new TestDruidCoordinatorConfig.Builder() configBuilder = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorIndexingPeriod(Duration.standardSeconds(0)) .withCoordinatorIndexingPeriod(Duration.standardSeconds(0))
.withCoordinatorKillPeriod(Duration.standardSeconds(0))
.withCoordinatorKillDurationToRetain(Duration.standardHours(36)) .withCoordinatorKillDurationToRetain(Duration.standardHours(36))
.withCoordinatorKillMaxSegments(10) .withCoordinatorKillMaxSegments(10)
.withCoordinatorKillBufferPeriod(Duration.standardSeconds(1)); .withCoordinatorKillBufferPeriod(Duration.standardSeconds(1));
dynamicConfigBuilder = CoordinatorDynamicConfig.builder()
.withKillTaskSlotRatio(1.0);
paramsBuilder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()); paramsBuilder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
} }
@ -137,6 +138,7 @@ public class KillUnusedSegmentsTest
public void testKillWithDefaultCoordinatorConfig() public void testKillWithDefaultCoordinatorConfig()
{ {
configBuilder = new TestDruidCoordinatorConfig.Builder(); configBuilder = new TestDruidCoordinatorConfig.Builder();
dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
final DateTime sixtyDaysAgo = NOW.minusDays(60); final DateTime sixtyDaysAgo = NOW.minusDays(60);
@ -151,9 +153,9 @@ public class KillUnusedSegmentsTest
initDuty(); initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats(); final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS)); Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS)); Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY)); Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
validateLastKillStateAndReset(DS1, Intervals.ETERNITY); validateLastKillStateAndReset(DS1, Intervals.ETERNITY);
@ -516,13 +518,27 @@ public class KillUnusedSegmentsTest
} }
@Test @Test
public void testKillTaskSlotStat() public void testDefaultKillTaskSlotStats()
{ {
dynamicConfigBuilder = CoordinatorDynamicConfig.builder();
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
initDuty(); initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(0, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
}
@Test
public void testKillTaskSlotStats1()
{
dynamicConfigBuilder.withKillTaskSlotRatio(1.0); dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE); dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build()); paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats(); final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@ -533,11 +549,11 @@ public class KillUnusedSegmentsTest
@Test @Test
public void testKillTaskSlotStats2() public void testKillTaskSlotStats2()
{ {
initDuty();
dynamicConfigBuilder.withKillTaskSlotRatio(0.0); dynamicConfigBuilder.withKillTaskSlotRatio(0.0);
dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE); dynamicConfigBuilder.withMaxKillTaskSlots(Integer.MAX_VALUE);
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build()); paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats(); final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(0, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(0, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@ -548,11 +564,11 @@ public class KillUnusedSegmentsTest
@Test @Test
public void testKillTaskSlotStats3() public void testKillTaskSlotStats3()
{ {
initDuty();
dynamicConfigBuilder.withKillTaskSlotRatio(1.0); dynamicConfigBuilder.withKillTaskSlotRatio(1.0);
dynamicConfigBuilder.withMaxKillTaskSlots(0); dynamicConfigBuilder.withMaxKillTaskSlots(0);
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build()); paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats(); final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(0, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(0, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@ -563,11 +579,11 @@ public class KillUnusedSegmentsTest
@Test @Test
public void testKillTaskSlotStats4() public void testKillTaskSlotStats4()
{ {
initDuty();
dynamicConfigBuilder.withKillTaskSlotRatio(0.1); dynamicConfigBuilder.withKillTaskSlotRatio(0.1);
dynamicConfigBuilder.withMaxKillTaskSlots(3); dynamicConfigBuilder.withMaxKillTaskSlots(3);
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build()); paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats(); final CoordinatorRunStats stats = runDutyAndGetStats();
Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS)); Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@ -800,6 +816,7 @@ public class KillUnusedSegmentsTest
private CoordinatorRunStats runDutyAndGetStats() private CoordinatorRunStats runDutyAndGetStats()
{ {
paramsBuilder.withDynamicConfigs(dynamicConfigBuilder.build());
final DruidCoordinatorRuntimeParams params = killDuty.run(paramsBuilder.build()); final DruidCoordinatorRuntimeParams params = killDuty.run(paramsBuilder.build());
return params.getCoordinatorStats(); return params.getCoordinatorStats();
} }

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.http; package org.apache.druid.server.http;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
@ -31,9 +30,6 @@ import org.junit.Test;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Set; import java.util.Set;
/**
*
*/
public class CoordinatorDynamicConfigTest public class CoordinatorDynamicConfigTest
{ {
private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500; private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 500;
@ -190,7 +186,7 @@ public class CoordinatorDynamicConfigTest
true true
); );
actual = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(1.0).build(actual); actual = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(0.1).build(actual);
assertConfig( assertConfig(
actual, actual,
1, 1,
@ -201,7 +197,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
2, 2,
false, false,
1, 1,
@ -221,7 +217,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
5, 5,
false, false,
1, 1,
@ -334,7 +330,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
1, 1,
@ -354,7 +350,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
1, 1,
@ -374,7 +370,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
1, 1,
@ -418,7 +414,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of("test1", "test2"), ImmutableSet.of("test1", "test2"),
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
1, 1,
@ -462,7 +458,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
1, 1,
@ -504,7 +500,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of(), ImmutableSet.of(),
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
true, true,
1, 1,
@ -557,7 +553,7 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of(), ImmutableSet.of(),
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
true, true,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@ -582,7 +578,7 @@ public class CoordinatorDynamicConfigTest
500, 500,
getDefaultNumBalancerThreads(), getDefaultNumBalancerThreads(),
emptyList, emptyList,
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
true, true,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@ -610,7 +606,7 @@ public class CoordinatorDynamicConfigTest
500, 500,
getDefaultNumBalancerThreads(), getDefaultNumBalancerThreads(),
ImmutableSet.of("DATASOURCE"), ImmutableSet.of("DATASOURCE"),
1.0, 0.1,
Integer.MAX_VALUE, Integer.MAX_VALUE,
false, false,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
@ -653,22 +649,6 @@ public class CoordinatorDynamicConfigTest
); );
} }
private void assertThatDeserializationFailsWithMessage(String json, String message)
{
JsonMappingException e = Assert.assertThrows(
JsonMappingException.class,
() -> mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(json, CoordinatorDynamicConfig.class)
),
CoordinatorDynamicConfig.class
)
);
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
IllegalArgumentException cause = (IllegalArgumentException) e.getCause();
Assert.assertEquals(message, cause.getMessage());
}
@Test @Test
public void testEqualsAndHashCode() public void testEqualsAndHashCode()
{ {

View File

@ -27,7 +27,7 @@ export const DEFAULT_COORDINATOR_DYNAMIC_CONFIG: CoordinatorDynamicConfig = {
replicationThrottleLimit: 500, replicationThrottleLimit: 500,
balancerComputeThreads: 1, balancerComputeThreads: 1,
killDataSourceWhitelist: [], killDataSourceWhitelist: [],
killTaskSlotRatio: 1, killTaskSlotRatio: 0.1,
maxKillTaskSlots: 2147483647, maxKillTaskSlots: 2147483647,
killPendingSegmentsSkipList: [], killPendingSegmentsSkipList: [],
maxSegmentsInNodeLoadingQueue: 500, maxSegmentsInNodeLoadingQueue: 500,