Allow for task limit on kill tasks spawned by auto kill coordinator duty (#14769)

### Description

Previously, the `KillUnusedSegments` coordinator duty, in charge of periodically deleting unused segments, could spawn an unlimited number of kill tasks for unused segments. This change adds 2 new coordinator dynamic configs that can be used to control the limit of tasks spawned by this coordinator duty

`killTaskSlotRatio`: Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty. Default is 1, which allows all available tasks to be used, which is the existing behavior

`maxKillTaskSlots`: Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty. Default is INT.MAX, which essentially allows for unbounded number of tasks, which is the existing behavior. 

Realize that we can effectively get away with just the one `killTaskSlotRatio`, but following similarly to the compaction config, which has similar properties; I thought it was good to have some control of the upper limit regardless of ratio provided.

#### Release note
NEW: `killTaskSlotRatio`  and `maxKillTaskSlots` coordinator dynamic config properties added that allow control of task resource usage spawned by `KillUnusedSegments` coordinator task (auto kill)
This commit is contained in:
zachjsh 2023-08-08 08:40:55 -04:00 committed by GitHub
parent 2845b6a424
commit 660e6cfa01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 398 additions and 23 deletions

View File

@ -934,6 +934,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"replicantLifetime": 15, "replicantLifetime": 15,
"replicationThrottleLimit": 10, "replicationThrottleLimit": 10,
"killDataSourceWhitelist": ["wikipedia", "testDatasource"], "killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"killTaskSlotRatio": 0.10,
"maxKillTaskSlots": 5,
"decommissioningNodes": ["localhost:8182", "localhost:8282"], "decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70, "decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false, "pauseCoordination": false,
@ -945,7 +947,7 @@ A sample Coordinator dynamic config JSON object is shown below:
Issuing a GET request at the same URL will return the spec that is currently in place. A description of the config setup spec is shown below. Issuing a GET request at the same URL will return the spec that is currently in place. A description of the config setup spec is shown below.
|Property| Description | Default | |Property| Description | Default |
|--------|-----------|-------| |--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|
|`millisToWaitBeforeDeleting`| How long does the Coordinator need to be a leader before it can start marking overshadowed segments as unused in metadata storage. | 900000 (15 mins) | |`millisToWaitBeforeDeleting`| How long does the Coordinator need to be a leader before it can start marking overshadowed segments as unused in metadata storage. | 900000 (15 mins) |
|`mergeBytesLimit`| The maximum total uncompressed size in bytes of segments to merge. | 524288000L | |`mergeBytesLimit`| The maximum total uncompressed size in bytes of segments to merge. | 524288000L |
|`mergeSegmentsLimit`| The maximum number of segments that can be in a single [append task](../ingestion/tasks.md). | 100 | |`mergeSegmentsLimit`| The maximum number of segments that can be in a single [append task](../ingestion/tasks.md). | 100 |
@ -955,6 +957,8 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`replicationThrottleLimit`| The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents historicals from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster. | 500 | |`replicationThrottleLimit`| The maximum number of segment replicas that can be assigned to a historical tier in a single Coordinator run. This property prevents historicals from becoming overwhelmed when loading extra replicas of segments that are already available in the cluster. | 500 |
|`balancerComputeThreads`| Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall. | 1 | |`balancerComputeThreads`| Thread pool size for computing moving cost of segments during segment balancing. Consider increasing this if you have a lot of segments and moving segments begins to stall. | 1 |
|`killDataSourceWhitelist`| List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array. | none | |`killDataSourceWhitelist`| List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array. | none |
|`killTaskSlotRatio`| Ratio of total available task slots, including autoscaling if applicable that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true. | 1 - all task slots can be used |
|`maxKillTaskSlots`| Maximum number of tasks that will be allowed for kill tasks. This limit only applies for kill tasks that are spawned automatically by the coordinator's auto kill duty, which is enabled when `druid.coordinator.kill.on` is true. | 2147483647 - no limit |
|`killPendingSegmentsSkipList`| List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array. | none | |`killPendingSegmentsSkipList`| List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array. | none |
|`maxSegmentsInNodeLoadingQueue`| The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes. | 500 | |`maxSegmentsInNodeLoadingQueue`| The maximum number of segments allowed in the load queue of any given server. Use this parameter to load segments faster if, for example, the cluster contains slow-loading nodes or if there are too many segments to be replicated to a particular node (when faster loading is preferred to better segments distribution). The optimal value depends on the loading speed of segments, acceptable replication time and number of nodes. | 500 |
|`useRoundRobinSegmentAssignment`| Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily. | true | |`useRoundRobinSegmentAssignment`| Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily. | true |

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
@ -69,6 +70,9 @@ public class CoordinatorDynamicConfig
* List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}. * List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}.
*/ */
private final Set<String> specificDataSourcesToKillUnusedSegmentsIn; private final Set<String> specificDataSourcesToKillUnusedSegmentsIn;
private final double killTaskSlotRatio;
private final int maxKillTaskSlots;
private final Set<String> decommissioningNodes; private final Set<String> decommissioningNodes;
private final Map<String, String> debugDimensions; private final Map<String, String> debugDimensions;
@ -130,6 +134,8 @@ public class CoordinatorDynamicConfig
// Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is // Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn, @JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn,
@JsonProperty("killTaskSlotRatio") @Nullable Double killTaskSlotRatio,
@JsonProperty("maxKillTaskSlots") @Nullable Integer maxKillTaskSlots,
// Type is Object here so that we can support both string and list as Coordinator console can not send array of // Type is Object here so that we can support both string and list as Coordinator console can not send array of
// strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn. // strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn.
// Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
@ -158,6 +164,20 @@ public class CoordinatorDynamicConfig
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
this.specificDataSourcesToKillUnusedSegmentsIn this.specificDataSourcesToKillUnusedSegmentsIn
= parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn); = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
if (null != killTaskSlotRatio && (killTaskSlotRatio < 0 || killTaskSlotRatio > 1)) {
throw InvalidInput.exception(
"killTaskSlotRatio [%.2f] is invalid. It must be >= 0 and <= 1.",
killTaskSlotRatio
);
}
this.killTaskSlotRatio = killTaskSlotRatio != null ? killTaskSlotRatio : Defaults.KILL_TASK_SLOT_RATIO;
if (null != maxKillTaskSlots && maxKillTaskSlots < 0) {
throw InvalidInput.exception(
"maxKillTaskSlots [%d] is invalid. It must be >= 0.",
maxKillTaskSlots
);
}
this.maxKillTaskSlots = maxKillTaskSlots != null ? maxKillTaskSlots : Defaults.MAX_KILL_TASK_SLOTS;
this.dataSourcesToNotKillStalePendingSegmentsIn this.dataSourcesToNotKillStalePendingSegmentsIn
= parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
this.maxSegmentsInNodeLoadingQueue = Builder.valueOrDefault( this.maxSegmentsInNodeLoadingQueue = Builder.valueOrDefault(
@ -297,6 +317,18 @@ public class CoordinatorDynamicConfig
return specificDataSourcesToKillUnusedSegmentsIn; return specificDataSourcesToKillUnusedSegmentsIn;
} }
@JsonProperty("killTaskSlotRatio")
public double getKillTaskSlotRatio()
{
return killTaskSlotRatio;
}
@JsonProperty("maxKillTaskSlots")
public int getMaxKillTaskSlots()
{
return maxKillTaskSlots;
}
@JsonIgnore @JsonIgnore
public boolean isKillUnusedSegmentsInAllDataSources() public boolean isKillUnusedSegmentsInAllDataSources()
{ {
@ -406,6 +438,8 @@ public class CoordinatorDynamicConfig
", replicationThrottleLimit=" + replicationThrottleLimit + ", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads + ", balancerComputeThreads=" + balancerComputeThreads +
", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn + ", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn +
", killTaskSlotRatio=" + killTaskSlotRatio +
", maxKillTaskSlots=" + maxKillTaskSlots +
", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn + ", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
", decommissioningNodes=" + decommissioningNodes + ", decommissioningNodes=" + decommissioningNodes +
@ -444,6 +478,8 @@ public class CoordinatorDynamicConfig
&& Objects.equals( && Objects.equals(
specificDataSourcesToKillUnusedSegmentsIn, specificDataSourcesToKillUnusedSegmentsIn,
that.specificDataSourcesToKillUnusedSegmentsIn) that.specificDataSourcesToKillUnusedSegmentsIn)
&& Objects.equals(killTaskSlotRatio, that.killTaskSlotRatio)
&& Objects.equals(maxKillTaskSlots, that.maxKillTaskSlots)
&& Objects.equals( && Objects.equals(
dataSourcesToNotKillStalePendingSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn,
that.dataSourcesToNotKillStalePendingSegmentsIn) that.dataSourcesToNotKillStalePendingSegmentsIn)
@ -464,6 +500,8 @@ public class CoordinatorDynamicConfig
balancerComputeThreads, balancerComputeThreads,
maxSegmentsInNodeLoadingQueue, maxSegmentsInNodeLoadingQueue,
specificDataSourcesToKillUnusedSegmentsIn, specificDataSourcesToKillUnusedSegmentsIn,
killTaskSlotRatio,
maxKillTaskSlots,
dataSourcesToNotKillStalePendingSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn,
decommissioningNodes, decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove, decommissioningMaxPercentOfMaxSegmentsToMove,
@ -495,6 +533,13 @@ public class CoordinatorDynamicConfig
static final int MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; static final int MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true; static final boolean USE_ROUND_ROBIN_ASSIGNMENT = true;
static final boolean SMART_SEGMENT_LOADING = true; static final boolean SMART_SEGMENT_LOADING = true;
// The following default values for killTaskSlotRatio and maxKillTaskSlots
// are to preserve the behavior before Druid 0.28 and a future version may
// want to consider better defaults so that kill tasks can not eat up all
// the capacity in the cluster would be nice
static final double KILL_TASK_SLOT_RATIO = 1.0;
static final int MAX_KILL_TASK_SLOTS = Integer.MAX_VALUE;
} }
public static class Builder public static class Builder
@ -507,6 +552,8 @@ public class CoordinatorDynamicConfig
private Integer replicationThrottleLimit; private Integer replicationThrottleLimit;
private Integer balancerComputeThreads; private Integer balancerComputeThreads;
private Object specificDataSourcesToKillUnusedSegmentsIn; private Object specificDataSourcesToKillUnusedSegmentsIn;
private Double killTaskSlotRatio;
private Integer maxKillTaskSlots;
private Object dataSourcesToNotKillStalePendingSegmentsIn; private Object dataSourcesToNotKillStalePendingSegmentsIn;
private Integer maxSegmentsInNodeLoadingQueue; private Integer maxSegmentsInNodeLoadingQueue;
private Object decommissioningNodes; private Object decommissioningNodes;
@ -532,6 +579,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn, @JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn,
@JsonProperty("killTaskSlotRatio") @Nullable Double killTaskSlotRatio,
@JsonProperty("maxKillTaskSlots") @Nullable Integer maxKillTaskSlots,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn, @JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
@ -553,6 +602,8 @@ public class CoordinatorDynamicConfig
this.replicationThrottleLimit = replicationThrottleLimit; this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = balancerComputeThreads; this.balancerComputeThreads = balancerComputeThreads;
this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn; this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn;
this.killTaskSlotRatio = killTaskSlotRatio;
this.maxKillTaskSlots = maxKillTaskSlots;
this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn; this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = decommissioningNodes; this.decommissioningNodes = decommissioningNodes;
@ -625,6 +676,18 @@ public class CoordinatorDynamicConfig
return this; return this;
} }
public Builder withKillTaskSlotRatio(Double killTaskSlotRatio)
{
this.killTaskSlotRatio = killTaskSlotRatio;
return this;
}
public Builder withMaxKillTaskSlots(Integer maxKillTaskSlots)
{
this.maxKillTaskSlots = maxKillTaskSlots;
return this;
}
public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue) public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue)
{ {
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
@ -685,6 +748,8 @@ public class CoordinatorDynamicConfig
valueOrDefault(replicationThrottleLimit, Defaults.REPLICATION_THROTTLE_LIMIT), valueOrDefault(replicationThrottleLimit, Defaults.REPLICATION_THROTTLE_LIMIT),
valueOrDefault(balancerComputeThreads, Defaults.BALANCER_COMPUTE_THREADS), valueOrDefault(balancerComputeThreads, Defaults.BALANCER_COMPUTE_THREADS),
specificDataSourcesToKillUnusedSegmentsIn, specificDataSourcesToKillUnusedSegmentsIn,
valueOrDefault(killTaskSlotRatio, Defaults.KILL_TASK_SLOT_RATIO),
valueOrDefault(maxKillTaskSlots, Defaults.MAX_KILL_TASK_SLOTS),
dataSourcesToNotKillStalePendingSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn,
valueOrDefault(maxSegmentsInNodeLoadingQueue, Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE), valueOrDefault(maxSegmentsInNodeLoadingQueue, Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
decommissioningNodes, decommissioningNodes,
@ -720,6 +785,8 @@ public class CoordinatorDynamicConfig
valueOrDefault(replicationThrottleLimit, defaults.getReplicationThrottleLimit()), valueOrDefault(replicationThrottleLimit, defaults.getReplicationThrottleLimit()),
valueOrDefault(balancerComputeThreads, defaults.getBalancerComputeThreads()), valueOrDefault(balancerComputeThreads, defaults.getBalancerComputeThreads()),
valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()), valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
valueOrDefault(killTaskSlotRatio, defaults.killTaskSlotRatio),
valueOrDefault(maxKillTaskSlots, defaults.maxKillTaskSlots),
valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, defaults.getDataSourcesToNotKillStalePendingSegmentsIn()), valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
valueOrDefault(maxSegmentsInNodeLoadingQueue, defaults.getMaxSegmentsInNodeLoadingQueue()), valueOrDefault(maxSegmentsInNodeLoadingQueue, defaults.getMaxSegmentsInNodeLoadingQueue()),
valueOrDefault(decommissioningNodes, defaults.getDecommissioningNodes()), valueOrDefault(decommissioningNodes, defaults.getDecommissioningNodes()),

View File

@ -19,24 +19,34 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.utils.CollectionUtils; import org.apache.druid.utils.CollectionUtils;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
/** /**
* Completely removes information about unused segments who have an interval end that comes before * Completely removes information about unused segments who have an interval end that comes before
@ -49,6 +59,8 @@ import java.util.List;
*/ */
public class KillUnusedSegments implements CoordinatorDuty public class KillUnusedSegments implements CoordinatorDuty
{ {
public static final String KILL_TASK_TYPE = "kill";
public static final String TASK_ID_PREFIX = "coordinator-issued";
private static final Logger log = new Logger(KillUnusedSegments.class); private static final Logger log = new Logger(KillUnusedSegments.class);
private final long period; private final long period;
@ -102,6 +114,13 @@ public class KillUnusedSegments implements CoordinatorDuty
{ {
Collection<String> dataSourcesToKill = Collection<String> dataSourcesToKill =
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn(); params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
double killTaskSlotRatio = params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
int maxKillTaskSlots = params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
int availableKillTaskSlots = getAvailableKillTaskSlots(killTaskSlotRatio, maxKillTaskSlots);
if (0 == availableKillTaskSlots) {
log.debug("Not killing any unused segments because there are no available kill task slots at this time.");
return params;
}
// If no datasource has been specified, all are eligible for killing unused segments // If no datasource has been specified, all are eligible for killing unused segments
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
@ -116,16 +135,22 @@ public class KillUnusedSegments implements CoordinatorDuty
} else { } else {
log.debug("Killing unused segments in datasources: %s", dataSourcesToKill); log.debug("Killing unused segments in datasources: %s", dataSourcesToKill);
lastKillTime = currentTimeMillis; lastKillTime = currentTimeMillis;
killUnusedSegments(dataSourcesToKill); killUnusedSegments(dataSourcesToKill, availableKillTaskSlots);
} }
return params; return params;
} }
private void killUnusedSegments(Collection<String> dataSourcesToKill) private void killUnusedSegments(Collection<String> dataSourcesToKill, int availableKillTaskSlots)
{ {
int submittedTasks = 0; int submittedTasks = 0;
for (String dataSource : dataSourcesToKill) { for (String dataSource : dataSourcesToKill) {
if (submittedTasks >= availableKillTaskSlots) {
log.info(StringUtils.format(
"Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume "
+ "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots));
break;
}
final Interval intervalToKill = findIntervalForKill(dataSource); final Interval intervalToKill = findIntervalForKill(dataSource);
if (intervalToKill == null) { if (intervalToKill == null) {
continue; continue;
@ -133,7 +158,7 @@ public class KillUnusedSegments implements CoordinatorDuty
try { try {
FutureUtils.getUnchecked(overlordClient.runKillTask( FutureUtils.getUnchecked(overlordClient.runKillTask(
"coordinator-issued", TASK_ID_PREFIX,
dataSource, dataSource,
intervalToKill, intervalToKill,
maxSegmentsToKill maxSegmentsToKill
@ -149,7 +174,7 @@ public class KillUnusedSegments implements CoordinatorDuty
} }
} }
log.debug("Submitted kill tasks for [%d] datasources.", submittedTasks); log.debug("Submitted [%d] kill tasks for [%d] datasources.", submittedTasks, dataSourcesToKill.size());
} }
/** /**
@ -174,4 +199,86 @@ public class KillUnusedSegments implements CoordinatorDuty
} }
} }
private int getAvailableKillTaskSlots(double killTaskSlotRatio, int maxKillTaskSlots)
{
return Math.max(
0,
getKillTaskCapacity(getTotalWorkerCapacity(), killTaskSlotRatio, maxKillTaskSlots) - getNumActiveKillTaskSlots()
);
}
/**
* Get the number of active kill task slots in use. The kill tasks counted, are only those thare are submitted
* by this coordinator duty (have prefix {@link KillUnusedSegments#TASK_ID_PREFIX}. The value returned here
* may be an overestimate, as in some cased the taskType can be null if middleManagers are running with an older
* version, and these tasks are counted as active kill tasks to be safe.
* @return
*/
private int getNumActiveKillTaskSlots()
{
final CloseableIterator<TaskStatusPlus> activeTasks =
FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 0), true);
// Fetch currently running kill tasks
int numActiveKillTasks = 0;
try (final Closer closer = Closer.create()) {
closer.register(activeTasks);
while (activeTasks.hasNext()) {
final TaskStatusPlus status = activeTasks.next();
// taskType can be null if middleManagers are running with an older version. Here, we consevatively regard
// the tasks of the unknown taskType as the killTask. This is because it's important to not run
// killTasks more than the configured limit at any time which might impact to the ingestion
// performance.
if (status.getType() == null
|| (KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX))) {
numActiveKillTasks++;
}
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
return numActiveKillTasks;
}
private int getTotalWorkerCapacity()
{
int totalWorkerCapacity;
try {
final IndexingTotalWorkerCapacityInfo workerCapacityInfo =
FutureUtils.get(overlordClient.getTotalWorkerCapacity(), true);
totalWorkerCapacity = workerCapacityInfo.getMaximumCapacityWithAutoScale();
if (totalWorkerCapacity < 0) {
totalWorkerCapacity = workerCapacityInfo.getCurrentClusterCapacity();
}
}
catch (ExecutionException e) {
// Call to getTotalWorkerCapacity may fail during a rolling upgrade: API was added in 0.23.0.
if (e.getCause() instanceof HttpResponseException
&& ((HttpResponseException) e.getCause()).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
log.noStackTrace().warn(e, "Call to getTotalWorkerCapacity failed. Falling back to getWorkers.");
totalWorkerCapacity =
FutureUtils.getUnchecked(overlordClient.getWorkers(), true)
.stream()
.mapToInt(worker -> worker.getWorker().getCapacity())
.sum();
} else {
throw new RuntimeException(e.getCause());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return totalWorkerCapacity;
}
@VisibleForTesting
static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio, int maxKillTaskSlots)
{
return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots);
}
} }

View File

@ -20,6 +20,13 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClient;
@ -32,6 +39,7 @@ import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -143,6 +151,7 @@ public class KillUnusedSegmentsTest
ArgumentMatchers.anyInt() ArgumentMatchers.anyInt()
); );
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
target.run(params); target.run(params);
Mockito.verify(overlordClient, Mockito.never()) Mockito.verify(overlordClient, Mockito.never())
.runKillTask(anyString(), anyString(), any(Interval.class)); .runKillTask(anyString(), anyString(), any(Interval.class));
@ -156,6 +165,7 @@ public class KillUnusedSegmentsTest
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
// No unused segment is older than the retention period // No unused segment is older than the retention period
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
target.run(params); target.run(params);
Mockito.verify(overlordClient, Mockito.never()) Mockito.verify(overlordClient, Mockito.never())
.runKillTask(anyString(), anyString(), any(Interval.class)); .runKillTask(anyString(), anyString(), any(Interval.class));
@ -169,6 +179,7 @@ public class KillUnusedSegmentsTest
yearOldSegment.getInterval().getStart(), yearOldSegment.getInterval().getStart(),
dayOldSegment.getInterval().getEnd() dayOldSegment.getInterval().getEnd()
); );
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval); runAndVerifyKillInterval(expectedKillInterval);
} }
@ -185,6 +196,7 @@ public class KillUnusedSegmentsTest
yearOldSegment.getInterval().getStart(), yearOldSegment.getInterval().getStart(),
nextDaySegment.getInterval().getEnd() nextDaySegment.getInterval().getEnd()
); );
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval); runAndVerifyKillInterval(expectedKillInterval);
} }
@ -200,6 +212,7 @@ public class KillUnusedSegmentsTest
yearOldSegment.getInterval().getStart(), yearOldSegment.getInterval().getStart(),
nextMonthSegment.getInterval().getEnd() nextMonthSegment.getInterval().getEnd()
); );
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
runAndVerifyKillInterval(expectedKillInterval); runAndVerifyKillInterval(expectedKillInterval);
} }
@ -210,10 +223,59 @@ public class KillUnusedSegmentsTest
.when(config).getCoordinatorKillMaxSegments(); .when(config).getCoordinatorKillMaxSegments();
target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config);
mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10);
// Only 1 unused segment is killed // Only 1 unused segment is killed
runAndVerifyKillInterval(yearOldSegment.getInterval()); runAndVerifyKillInterval(yearOldSegment.getInterval());
} }
@Test
public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill()
{
mockTaskSlotUsage(0.10, 10, 1, 5);
runAndVerifyNoKill();
}
@Test
public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill()
{
mockTaskSlotUsage(1.0, 3, 3, 10);
runAndVerifyNoKill();
}
@Test
public void testGetKillTaskCapacity()
{
Assert.assertEquals(
10,
KillUnusedSegments.getKillTaskCapacity(10, 1.0, Integer.MAX_VALUE)
);
Assert.assertEquals(
0,
KillUnusedSegments.getKillTaskCapacity(10, 0.0, Integer.MAX_VALUE)
);
Assert.assertEquals(
10,
KillUnusedSegments.getKillTaskCapacity(10, Double.POSITIVE_INFINITY, Integer.MAX_VALUE)
);
Assert.assertEquals(
0,
KillUnusedSegments.getKillTaskCapacity(10, 1.0, 0)
);
Assert.assertEquals(
1,
KillUnusedSegments.getKillTaskCapacity(10, 0.1, 3)
);
Assert.assertEquals(
2,
KillUnusedSegments.getKillTaskCapacity(10, 0.3, 2)
);
}
private void runAndVerifyKillInterval(Interval expectedKillInterval) private void runAndVerifyKillInterval(Interval expectedKillInterval)
{ {
int limit = config.getCoordinatorKillMaxSegments(); int limit = config.getCoordinatorKillMaxSegments();
@ -226,6 +288,53 @@ public class KillUnusedSegmentsTest
); );
} }
private void runAndVerifyNoKill()
{
target.run(params);
Mockito.verify(overlordClient, Mockito.never()).runKillTask(
ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(),
ArgumentMatchers.any(Interval.class),
ArgumentMatchers.anyInt()
);
}
private void mockTaskSlotUsage(
double killTaskSlotRatio,
int maxKillTaskSlots,
int numPendingCoordKillTasks,
int maxWorkerCapacity
)
{
Mockito.doReturn(killTaskSlotRatio)
.when(coordinatorDynamicConfig).getKillTaskSlotRatio();
Mockito.doReturn(maxKillTaskSlots)
.when(coordinatorDynamicConfig).getMaxKillTaskSlots();
Mockito.doReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(1, maxWorkerCapacity)))
.when(overlordClient)
.getTotalWorkerCapacity();
List<TaskStatusPlus> runningCoordinatorIssuedKillTasks = new ArrayList<>();
for (int i = 0; i < numPendingCoordKillTasks; i++) {
runningCoordinatorIssuedKillTasks.add(new TaskStatusPlus(
KillUnusedSegments.TASK_ID_PREFIX + "_taskId_" + i,
"groupId_" + i,
KillUnusedSegments.KILL_TASK_TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
TaskState.RUNNING,
RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
"datasource",
null
));
}
Mockito.doReturn(Futures.immediateFuture(
CloseableIterators.withEmptyBaggage(runningCoordinatorIssuedKillTasks.iterator())))
.when(overlordClient)
.taskStatuses(null, null, 0);
}
private DataSegment createSegmentWithEnd(DateTime endTime) private DataSegment createSegmentWithEnd(DateTime endTime)
{ {
return new DataSegment( return new DataSegment(

View File

@ -27,6 +27,8 @@ import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Set; import java.util.Set;
/** /**
@ -50,6 +52,8 @@ public class CoordinatorDynamicConfigTest
+ " \"replicationThrottleLimit\": 1,\n" + " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n" + " \"balancerComputeThreads\": 2, \n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killTaskSlotRatio\": 0.15,\n"
+ " \"maxKillTaskSlots\": 2,\n"
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n" + " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
@ -79,6 +83,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false, false,
1, 1,
decommissioning, decommissioning,
@ -99,6 +105,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -119,6 +127,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -139,6 +149,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -159,6 +171,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -179,6 +193,52 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
0.15,
2,
false,
1,
ImmutableSet.of("host1"),
5,
true,
true,
10
);
actual = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(1.0).build(actual);
assertConfig(
actual,
1,
1,
1,
1,
1,
1,
2,
whitelist,
1.0,
2,
false,
1,
ImmutableSet.of("host1"),
5,
true,
true,
10
);
actual = CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual);
assertConfig(
actual,
1,
1,
1,
1,
1,
1,
2,
whitelist,
1.0,
5,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -216,6 +276,8 @@ public class CoordinatorDynamicConfigTest
null, null,
null, null,
null, null,
null,
null,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
5, 5,
true, true,
@ -243,6 +305,8 @@ public class CoordinatorDynamicConfigTest
ImmutableSet.of("test1"), ImmutableSet.of("test1"),
null, null,
null, null,
null,
null,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
5, 5,
true, true,
@ -292,6 +356,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0,
Integer.MAX_VALUE,
false, false,
1, 1,
decommissioning, decommissioning,
@ -312,6 +378,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0,
Integer.MAX_VALUE,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -332,6 +400,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0,
Integer.MAX_VALUE,
false, false,
1, 1,
ImmutableSet.of("host1"), ImmutableSet.of("host1"),
@ -376,6 +446,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of("test1", "test2"), ImmutableSet.of("test1", "test2"),
1.0,
Integer.MAX_VALUE,
false, false,
1, 1,
ImmutableSet.of(), ImmutableSet.of(),
@ -435,6 +507,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
whitelist, whitelist,
1.0,
Integer.MAX_VALUE,
false, false,
1, 1,
decommissioning, decommissioning,
@ -477,6 +551,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of(), ImmutableSet.of(),
1.0,
Integer.MAX_VALUE,
true, true,
1, 1,
ImmutableSet.of(), ImmutableSet.of(),
@ -530,6 +606,8 @@ public class CoordinatorDynamicConfigTest
1, 1,
2, 2,
ImmutableSet.of(), ImmutableSet.of(),
1.0,
Integer.MAX_VALUE,
true, true,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
ImmutableSet.of(), ImmutableSet.of(),
@ -555,6 +633,8 @@ public class CoordinatorDynamicConfigTest
500, 500,
1, 1,
emptyList, emptyList,
1.0,
Integer.MAX_VALUE,
true, true,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
emptyList, emptyList,
@ -583,6 +663,8 @@ public class CoordinatorDynamicConfigTest
500, 500,
1, 1,
ImmutableSet.of("DATASOURCE"), ImmutableSet.of("DATASOURCE"),
1.0,
Integer.MAX_VALUE,
false, false,
EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
ImmutableSet.of(), ImmutableSet.of(),
@ -621,6 +703,8 @@ public class CoordinatorDynamicConfigTest
null, null,
null, null,
null, null,
null,
null,
null null
).build(current) ).build(current)
); );
@ -670,6 +754,8 @@ public class CoordinatorDynamicConfigTest
int expectedReplicationThrottleLimit, int expectedReplicationThrottleLimit,
int expectedBalancerComputeThreads, int expectedBalancerComputeThreads,
Set<String> expectedSpecificDataSourcesToKillUnusedSegmentsIn, Set<String> expectedSpecificDataSourcesToKillUnusedSegmentsIn,
Double expectedKillTaskSlotRatio,
@Nullable Integer expectedMaxKillTaskSlots,
boolean expectedKillUnusedSegmentsInAllDataSources, boolean expectedKillUnusedSegmentsInAllDataSources,
int expectedMaxSegmentsInNodeLoadingQueue, int expectedMaxSegmentsInNodeLoadingQueue,
Set<String> decommissioningNodes, Set<String> decommissioningNodes,
@ -694,6 +780,8 @@ public class CoordinatorDynamicConfigTest
config.getSpecificDataSourcesToKillUnusedSegmentsIn() config.getSpecificDataSourcesToKillUnusedSegmentsIn()
); );
Assert.assertEquals(expectedKillUnusedSegmentsInAllDataSources, config.isKillUnusedSegmentsInAllDataSources()); Assert.assertEquals(expectedKillUnusedSegmentsInAllDataSources, config.isKillUnusedSegmentsInAllDataSources());
Assert.assertEquals(expectedKillTaskSlotRatio, config.getKillTaskSlotRatio(), 0.001);
Assert.assertEquals((int) expectedMaxKillTaskSlots, config.getMaxKillTaskSlots());
Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue()); Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue());
Assert.assertEquals(decommissioningNodes, config.getDecommissioningNodes()); Assert.assertEquals(decommissioningNodes, config.getDecommissioningNodes());
Assert.assertEquals( Assert.assertEquals(