rename maintenance mode to decommission (#7154)

* rename maintenance mode to decommission

* review changes

* missed one

* fix straggler, add doc about decommissioning stalling if no active servers

* fix missed typo, docs

* refine docs

* doc changes, replace generals

* add explicit comment to mention suppressed stats for balanceTier

* rename decommissioningVelocity to decommissioningMaxSegmentsToMovePercent and update docs

* fix precondition check

* decommissioningMaxPercentOfMaxSegmentsToMove

* fix test

* fix test

* fixes
This commit is contained in:
Clint Wylie 2019-03-08 16:33:51 -08:00 committed by Gian Merlino
parent de55905a5f
commit a44df6522c
11 changed files with 192 additions and 173 deletions

View File

@ -783,8 +783,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"replicationThrottleLimit": 10,
"emitBalancingStats": false,
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"historicalNodesInMaintenance": ["localhost:8182", "localhost:8282"],
"nodesInMaintenancePriority": 7
"decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70
}
```
@ -798,14 +798,14 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segment starts to get stuck.|1|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of dataSources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" processes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of processes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|`historicalNodesInMaintenance`| List of Historical nodes in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves segments from the nodes according to a specified priority.|none|
|`nodesInMaintenancePriority`| Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) from servers in maitenance during balancing phase, i.e.:<br>0 - no segments from servers in maintenance will be processed during balancing<br>5 - 50% segments from servers in maintenance<br>10 - 100% segments from servers in maintenance<br>By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time instead.|7|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
To view the audit history of Coordinator dynamic config issue a GET request to the URL -

View File

@ -28,6 +28,8 @@ import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
@ -56,8 +58,8 @@ public class CoordinatorDynamicConfig
private final boolean emitBalancingStats;
private final boolean killAllDataSources;
private final Set<String> killableDataSources;
private final Set<String> historicalNodesInMaintenance;
private final int nodesInMaintenancePriority;
private final Set<String> decommissioningNodes;
private final int decommissioningMaxPercentOfMaxSegmentsToMove;
// The pending segments of the dataSources in this list are not killed.
private final Set<String> protectedPendingSegmentDatasources;
@ -88,8 +90,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("killAllDataSources") boolean killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") Object protectedPendingSegmentDatasources,
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("historicalNodesInMaintenance") Object historicalNodesInMaintenance,
@JsonProperty("nodesInMaintenancePriority") int nodesInMaintenancePriority
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@ -104,12 +106,12 @@ public class CoordinatorDynamicConfig
this.killableDataSources = parseJsonStringOrArray(killableDataSources);
this.protectedPendingSegmentDatasources = parseJsonStringOrArray(protectedPendingSegmentDatasources);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.historicalNodesInMaintenance = parseJsonStringOrArray(historicalNodesInMaintenance);
this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
Preconditions.checkArgument(
nodesInMaintenancePriority >= 0 && nodesInMaintenancePriority <= 10,
"nodesInMaintenancePriority should be in range [0, 10]"
decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100,
"decommissioningMaxPercentOfMaxSegmentsToMove should be in range [0, 100]"
);
this.nodesInMaintenancePriority = nodesInMaintenancePriority;
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
if (this.killAllDataSources && !this.killableDataSources.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
@ -231,32 +233,37 @@ public class CoordinatorDynamicConfig
}
/**
* Historical nodes list in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves
* segments from those nodes according to a specified priority.
* List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers,
* and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by
* {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove}.
*
* @return list of host:port entries
*/
@JsonProperty
public Set<String> getHistoricalNodesInMaintenance()
public Set<String> getDecommissioningNodes()
{
return historicalNodesInMaintenance;
return decommissioningNodes;
}
/**
* Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10))
* from servers in maitenance during balancing phase, i.e.:
* 0 - no segments from servers in maintenance will be processed during balancing
* 5 - 50% segments from servers in maintenance
* 10 - 100% segments from servers in maintenance
* By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time
* instead.
* The percent of {@link CoordinatorDynamicConfig#getMaxSegmentsToMove()} that determines the maximum number of
* segments that may be moved away from 'decommissioning' servers (specified by
* {@link CoordinatorDynamicConfig#getDecommissioningNodes()}) to non-decommissioning servers during one Coordinator
* balancer run. If this value is 0, segments will neither be moved from or to 'decommissioning' servers, effectively
* putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules.
* Decommissioning can also become stalled if there are no available active servers to place the segments. By
* adjusting this value, an operator can prevent active servers from overload by prioritizing balancing, or
* decrease decommissioning time instead.
*
* @return number in range [0, 10]
* @return number in range [0, 100]
*/
@Min(0)
@Max(100)
@JsonProperty
public int getNodesInMaintenancePriority()
public int getDecommissioningMaxPercentOfMaxSegmentsToMove()
{
return nodesInMaintenancePriority;
return decommissioningMaxPercentOfMaxSegmentsToMove;
}
@Override
@ -275,8 +282,8 @@ public class CoordinatorDynamicConfig
", killDataSourceWhitelist=" + killableDataSources +
", protectedPendingSegmentDatasources=" + protectedPendingSegmentDatasources +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
", historicalNodesInMaintenance=" + historicalNodesInMaintenance +
", nodesInMaintenancePriority=" + nodesInMaintenancePriority +
", decommissioningNodes=" + decommissioningNodes +
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
'}';
}
@ -328,10 +335,10 @@ public class CoordinatorDynamicConfig
if (!Objects.equals(protectedPendingSegmentDatasources, that.protectedPendingSegmentDatasources)) {
return false;
}
if (!Objects.equals(historicalNodesInMaintenance, that.historicalNodesInMaintenance)) {
if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) {
return false;
}
return nodesInMaintenancePriority == that.nodesInMaintenancePriority;
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
}
@Override
@ -350,8 +357,8 @@ public class CoordinatorDynamicConfig
maxSegmentsInNodeLoadingQueue,
killableDataSources,
protectedPendingSegmentDatasources,
historicalNodesInMaintenance,
nodesInMaintenancePriority
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove
);
}
@ -372,7 +379,7 @@ public class CoordinatorDynamicConfig
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY = 7;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private Long millisToWaitBeforeDeleting;
private Long mergeBytesLimit;
@ -386,8 +393,8 @@ public class CoordinatorDynamicConfig
private Boolean killAllDataSources;
private Object killPendingSegmentsSkipList;
private Integer maxSegmentsInNodeLoadingQueue;
private Object maintenanceList;
private Integer maintenanceModeSegmentsPriority;
private Object decommissioningNodes;
private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
public Builder()
{
@ -407,8 +414,8 @@ public class CoordinatorDynamicConfig
@JsonProperty("killAllDataSources") @Nullable Boolean killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object killPendingSegmentsSkipList,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("historicalNodesInMaintenance") @Nullable Object maintenanceList,
@JsonProperty("nodesInMaintenancePriority") @Nullable Integer maintenanceModeSegmentsPriority
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@ -423,8 +430,8 @@ public class CoordinatorDynamicConfig
this.killableDataSources = killableDataSources;
this.killPendingSegmentsSkipList = killPendingSegmentsSkipList;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.maintenanceList = maintenanceList;
this.maintenanceModeSegmentsPriority = maintenanceModeSegmentsPriority;
this.decommissioningNodes = decommissioningNodes;
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@ -493,15 +500,15 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withMaintenanceList(Set<String> list)
public Builder withDecommissioningNodes(Set<String> decommissioning)
{
this.maintenanceList = list;
this.decommissioningNodes = decommissioning;
return this;
}
public Builder withMaintenanceModeSegmentsPriority(Integer priority)
public Builder withDecommissioningMaxPercentOfMaxSegmentsToMove(Integer percent)
{
this.maintenanceModeSegmentsPriority = priority;
this.decommissioningMaxPercentOfMaxSegmentsToMove = percent;
return this;
}
@ -522,10 +529,10 @@ public class CoordinatorDynamicConfig
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
: maxSegmentsInNodeLoadingQueue,
maintenanceList,
maintenanceModeSegmentsPriority == null
? DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY
: maintenanceModeSegmentsPriority
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove == null
? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
: decommissioningMaxPercentOfMaxSegmentsToMove
);
}
@ -548,10 +555,10 @@ public class CoordinatorDynamicConfig
maxSegmentsInNodeLoadingQueue == null
? defaults.getMaxSegmentsInNodeLoadingQueue()
: maxSegmentsInNodeLoadingQueue,
maintenanceList == null ? defaults.getHistoricalNodesInMaintenance() : maintenanceList,
maintenanceModeSegmentsPriority == null
? defaults.getNodesInMaintenancePriority()
: maintenanceModeSegmentsPriority
decommissioningNodes == null ? defaults.getDecommissioningNodes() : decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove == null
? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
: decommissioningMaxPercentOfMaxSegmentsToMove
);
}
}

View File

@ -694,7 +694,7 @@ public class DruidCoordinator
}
// Find all historical servers, group them by subType and sort by ascending usage
Set<String> nodesInMaintenance = params.getCoordinatorDynamicConfig().getHistoricalNodesInMaintenance();
Set<String> decommissioningServers = params.getCoordinatorDynamicConfig().getDecommissioningNodes();
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
@ -709,7 +709,7 @@ public class DruidCoordinator
new ServerHolder(
server,
loadManagementPeons.get(server.getName()),
nodesInMaintenance.contains(server.getHost())
decommissioningServers.contains(server.getHost())
)
);
}

View File

@ -32,18 +32,18 @@ public class ServerHolder implements Comparable<ServerHolder>
private static final Logger log = new Logger(ServerHolder.class);
private final ImmutableDruidServer server;
private final LoadQueuePeon peon;
private final boolean inMaintenance;
private final boolean isDecommissioning;
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
{
this(server, peon, false);
}
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean inMaintenance)
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioning)
{
this.server = server;
this.peon = peon;
this.inMaintenance = inMaintenance;
this.isDecommissioning = isDecommissioning;
}
public ImmutableDruidServer getServer()
@ -82,14 +82,15 @@ public class ServerHolder implements Comparable<ServerHolder>
}
/**
* Historical nodes can be placed in maintenance mode, which instructs Coordinator to move segments from them
* according to a specified priority. The mechanism allows to drain segments from nodes which are planned for
* replacement.
* @return true if the node is in maitenance mode
* Historical nodes can be 'decommissioned', which instructs Coordinator to move segments from them according to
* the percent of move operations diverted from normal balancer moves for this purpose by
* {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove()}. The mechanism allows draining
* segments from nodes which are planned for replacement.
* @return true if the node is decommissioning
*/
public boolean isInMaintenance()
public boolean isDecommissioning()
{
return inMaintenance;
return isDecommissioning;
}
public long getAvailableSize()

View File

@ -95,37 +95,41 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
{
if (params.getAvailableSegments().size() == 0) {
log.info("Metadata segments are not available. Cannot balance.");
log.warn("Metadata segments are not available. Cannot balance.");
// suppress emit zero stats
return;
}
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) {
reduceLifetimes(tier);
log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.get(tier).size());
log.info(
"[%s]: Still waiting on %,d segments to be moved. Skipping balance.",
tier,
currentlyMovingSegments.get(tier).size()
);
// suppress emit zero stats
return;
}
/*
Take as much segments from maintenance servers as priority allows and find the best location for them on
available servers. After that, balance segments within available servers pool.
Take as many segments from decommissioning servers as decommissioningMaxPercentOfMaxSegmentsToMove allows and find
the best location for them on active servers. After that, balance segments within active servers pool.
*/
Map<Boolean, List<ServerHolder>> partitions =
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isInMaintenance));
final List<ServerHolder> maintenanceServers = partitions.get(true);
final List<ServerHolder> availableServers = partitions.get(false);
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
final List<ServerHolder> decommissioningServers = partitions.get(true);
final List<ServerHolder> activeServers = partitions.get(false);
log.info(
"Found %d servers in maintenance, %d available servers servers",
maintenanceServers.size(),
availableServers.size()
"Found %d active servers, %d decommissioning servers",
activeServers.size(),
decommissioningServers.size()
);
if (maintenanceServers.isEmpty()) {
if (availableServers.size() <= 1) {
log.info("[%s]: %d available servers servers found. Cannot balance.", tier, availableServers.size());
}
} else if (availableServers.isEmpty()) {
log.info("[%s]: no available servers servers found during maintenance. Cannot balance.", tier);
if ((decommissioningServers.isEmpty() && activeServers.size() <= 1) || activeServers.isEmpty()) {
log.warn("[%s]: insufficient active servers. Cannot balance.", tier);
// suppress emit zero stats
return;
}
int numSegments = 0;
@ -135,22 +139,29 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
// suppress emit zero stats
return;
}
final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
int priority = params.getCoordinatorDynamicConfig().getNodesInMaintenancePriority();
int maxMaintenanceSegmentsToMove = (int) Math.ceil(maxSegmentsToMove * priority / 10.0);
log.info("Processing %d segments from servers in maintenance mode", maxMaintenanceSegmentsToMove);
Pair<Integer, Integer> maintenanceResult =
balanceServers(params, maintenanceServers, availableServers, maxMaintenanceSegmentsToMove);
int maxGeneralSegmentsToMove = maxSegmentsToMove - maintenanceResult.lhs;
log.info("Processing %d segments from servers in general mode", maxGeneralSegmentsToMove);
Pair<Integer, Integer> generalResult =
balanceServers(params, availableServers, availableServers, maxGeneralSegmentsToMove);
int decommissioningMaxPercentOfMaxSegmentsToMove =
params.getCoordinatorDynamicConfig().getDecommissioningMaxPercentOfMaxSegmentsToMove();
int maxSegmentsToMoveFromDecommissioningNodes =
(int) Math.ceil(maxSegmentsToMove * (decommissioningMaxPercentOfMaxSegmentsToMove / 100.0));
log.info(
"Processing %d segments for moving from decommissioning servers",
maxSegmentsToMoveFromDecommissioningNodes
);
Pair<Integer, Integer> decommissioningResult =
balanceServers(params, decommissioningServers, activeServers, maxSegmentsToMoveFromDecommissioningNodes);
int moved = generalResult.lhs + maintenanceResult.lhs;
int unmoved = generalResult.rhs + maintenanceResult.rhs;
int maxGeneralSegmentsToMove = maxSegmentsToMove - decommissioningResult.lhs;
log.info("Processing %d segments for balancing between active servers", maxGeneralSegmentsToMove);
Pair<Integer, Integer> generalResult =
balanceServers(params, activeServers, activeServers, maxGeneralSegmentsToMove);
int moved = generalResult.lhs + decommissioningResult.lhs;
int unmoved = generalResult.rhs + decommissioningResult.rhs;
if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier);

View File

@ -46,7 +46,7 @@ public abstract class BroadcastDistributionRule implements Rule
} else {
params.getDruidCluster().getAllServers().forEach(
eachHolder -> {
if (!eachHolder.isInMaintenance()
if (!eachHolder.isDecommissioning()
&& colocatedDataSources.stream()
.anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
loadServerHolders.add(eachHolder);

View File

@ -153,8 +153,8 @@ public abstract class LoadRule implements Rule
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return Collections.emptyList();
}
Predicate<ServerHolder> isNotInMaintenance = s -> !s.isInMaintenance();
return queue.stream().filter(isNotInMaintenance.and(predicate)).collect(Collectors.toList());
Predicate<ServerHolder> isActive = s -> !s.isDecommissioning();
return queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList());
}
/**
@ -385,14 +385,14 @@ public abstract class LoadRule implements Rule
Map<Boolean, TreeSet<ServerHolder>> holders = holdersInTier.stream()
.filter(s -> s.isServingSegment(segment))
.collect(Collectors.partitioningBy(
ServerHolder::isInMaintenance,
ServerHolder::isDecommissioning,
Collectors.toCollection(TreeSet::new)
));
TreeSet<ServerHolder> maintenanceServers = holders.get(true);
TreeSet<ServerHolder> availableServers = holders.get(false);
int left = dropSegmentFromServers(balancerStrategy, segment, maintenanceServers, numToDrop);
TreeSet<ServerHolder> decommissioningServers = holders.get(true);
TreeSet<ServerHolder> activeServers = holders.get(false);
int left = dropSegmentFromServers(balancerStrategy, segment, decommissioningServers, numToDrop);
if (left > 0) {
left = dropSegmentFromServers(balancerStrategy, segment, availableServers, left);
left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left);
}
if (left != 0) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getId());

View File

@ -201,14 +201,14 @@ public class DruidCoordinatorBalancerTest
/**
* Server 1 has 2 segments.
* Server 2 (maintenance) has 2 segments.
* Server 2 (decommissioning) has 2 segments.
* Server 3 is empty.
* Maintenance has priority 7.
* Decommissioning percent is 60.
* Max segments to move is 3.
* 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 1.
*/
@Test
public void testMoveMaintenancePriority()
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4));
@ -239,8 +239,8 @@ public class DruidCoordinatorBalancerTest
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(3)
.withMaintenanceModeSegmentsPriority(6)
.build() // ceil(3 * 0.6) = 2 segments from servers in maintenance
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
.build() // ceil(3 * 0.6) = 2 segments from decommissioning servers
)
.withBalancerStrategy(strategy)
.build();
@ -251,28 +251,28 @@ public class DruidCoordinatorBalancerTest
}
@Test
public void testZeroMaintenancePriority()
public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove()
{
DruidCoordinatorRuntimeParams params = setupParamsForMaintenancePriority(0);
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0);
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(peon3.getSegmentsToLoad(), is(equalTo(ImmutableSet.of(segment1))));
}
@Test
public void testMaxMaintenancePriority()
public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove()
{
DruidCoordinatorRuntimeParams params = setupParamsForMaintenancePriority(10);
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(peon3.getSegmentsToLoad(), is(equalTo(ImmutableSet.of(segment2))));
}
/**
* Should balance segments as usual (ignoring priority) with empty maintenanceList.
* Should balance segments as usual (ignoring percent) with empty decommissioningNodes.
*/
@Test
public void testMoveMaintenancePriorityWithNoMaintenance()
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Arrays.asList(segment3, segment4));
@ -300,7 +300,7 @@ public class DruidCoordinatorBalancerTest
ImmutableList.of(false, false, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(3).withMaintenanceModeSegmentsPriority(9).build()
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(3).withDecommissioningMaxPercentOfMaxSegmentsToMove(9).build()
)
.withBalancerStrategy(strategy)
.build();
@ -311,10 +311,10 @@ public class DruidCoordinatorBalancerTest
}
/**
* Shouldn't move segments to a server in maintenance mode.
* Shouldn't move segments to a decommissioning server.
*/
@Test
public void testMoveToServerInMaintenance()
public void testMoveToDecommissioningServer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
@ -347,7 +347,7 @@ public class DruidCoordinatorBalancerTest
}
@Test
public void testMoveFromServerInMaintenance()
public void testMoveFromDecommissioningServer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList());
@ -512,7 +512,7 @@ public class DruidCoordinatorBalancerTest
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons,
List<Boolean> maintenance
List<Boolean> decommissioning
)
{
return DruidCoordinatorRuntimeParams
@ -524,7 +524,7 @@ public class DruidCoordinatorBalancerTest
"normal",
IntStream
.range(0, druidServers.size())
.mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i), maintenance.get(i)))
.mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i), decommissioning.get(i)))
.collect(Collectors.toSet())
)
)
@ -622,7 +622,7 @@ public class DruidCoordinatorBalancerTest
}
}
private DruidCoordinatorRuntimeParams setupParamsForMaintenancePriority(int priority)
private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(int percent)
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment3));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment2, segment3));
@ -632,7 +632,7 @@ public class DruidCoordinatorBalancerTest
mockCoordinator(coordinator);
// either maintenance servers list or general ones (ie servers list is [2] or [1, 3])
// either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true))))
.andReturn(new BalancerSegmentHolder(druidServer2, segment2));
@ -651,7 +651,7 @@ public class DruidCoordinatorBalancerTest
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
.withMaintenanceModeSegmentsPriority(priority)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(percent)
.build()
)
.withBalancerStrategy(strategy)

View File

@ -58,9 +58,9 @@ public class BroadcastDistributionRuleTest
private final List<DataSegment> largeSegments2 = new ArrayList<>();
private DataSegment smallSegment;
private DruidCluster secondCluster;
private ServerHolder generalServer;
private ServerHolder maintenanceServer2;
private ServerHolder maintenanceServer1;
private ServerHolder activeServer;
private ServerHolder decommissioningServer1;
private ServerHolder decommissioningServer2;
@Before
public void setUp()
@ -200,9 +200,9 @@ public class BroadcastDistributionRuleTest
)
);
generalServer = new ServerHolder(
activeServer = new ServerHolder(
new DruidServer(
"general",
"active",
"host1",
null,
100,
@ -214,9 +214,9 @@ public class BroadcastDistributionRuleTest
new LoadQueuePeonTester()
);
maintenanceServer1 = new ServerHolder(
decommissioningServer1 = new ServerHolder(
new DruidServer(
"maintenance1",
"decommissioning1",
"host2",
null,
100,
@ -229,9 +229,9 @@ public class BroadcastDistributionRuleTest
true
);
maintenanceServer2 = new ServerHolder(
decommissioningServer2 = new ServerHolder(
new DruidServer(
"maintenance2",
"decommissioning2",
"host3",
null,
100,
@ -267,9 +267,9 @@ public class BroadcastDistributionRuleTest
ImmutableMap.of(
"tier1",
Stream.of(
generalServer,
maintenanceServer1,
maintenanceServer2
activeServer,
decommissioningServer1,
decommissioningServer2
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -316,18 +316,18 @@ public class BroadcastDistributionRuleTest
/**
* Servers:
* name | segments
* -------------+--------------
* general | large segment
* maintenance1 | small segment
* maintenance2 | large segment
* -----------------+--------------
* active | large segment
* decommissioning1 | small segment
* decommissioning2 | large segment
*
* After running the rule for the small segment:
* general | large & small segments
* maintenance1 |
* maintenance2 | large segment
* active | large & small segments
* decommissioning1 |
* decommissionint2 | large segment
*/
@Test
public void testBroadcastWithMaintenance()
public void testBroadcastDecommissioning()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
@ -348,9 +348,9 @@ public class BroadcastDistributionRuleTest
assertEquals(1L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
assertEquals(false, stats.hasPerTierStats());
assertEquals(1, generalServer.getPeon().getSegmentsToLoad().size());
assertEquals(1, maintenanceServer1.getPeon().getSegmentsToDrop().size());
assertEquals(0, maintenanceServer2.getPeon().getSegmentsToLoad().size());
assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size());
assertEquals(1, decommissioningServer1.getPeon().getSegmentsToDrop().size());
assertEquals(0, decommissioningServer2.getPeon().getSegmentsToLoad().size());
}
@Test

View File

@ -687,11 +687,11 @@ public class LoadRuleTest
}
/**
* 2 servers in different tiers, the first is in maitenance mode.
* Should not load a segment to the server in maintenance mode.
* 2 servers in different tiers, the first is decommissioning.
* Should not load a segment to the server that is decommissioning
*/
@Test
public void testLoadDuringMaitenance()
public void testLoadDecommissioning()
{
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
@ -737,11 +737,11 @@ public class LoadRuleTest
}
/**
* 2 tiers, 2 servers each, 1 server of the second tier is in maintenance.
* Should not load a segment to the server in maintenance mode.
* 2 tiers, 2 servers each, 1 server of the second tier is decommissioning.
* Should not load a segment to the server that is decommssioning.
*/
@Test
public void testLoadReplicaDuringMaitenance()
public void testLoadReplicaDuringDecommissioning()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
@ -796,11 +796,11 @@ public class LoadRuleTest
}
/**
* 2 servers with a segment, one server in maintenance mode.
* 2 servers with a segment, one server decommissioning.
* Should drop a segment from both.
*/
@Test
public void testDropDuringMaintenance()
public void testDropDuringDecommissioning()
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
@ -859,12 +859,12 @@ public class LoadRuleTest
/**
* 3 servers hosting 3 replicas of the segment.
* 1 servers is in maitenance.
* 1 servers is decommissioning.
* 1 replica is redundant.
* Should drop from the server in maintenance.
* Should drop from the decommissioning server.
*/
@Test
public void testRedundantReplicaDropDuringMaintenance()
public void testRedundantReplicaDropDuringDecommissioning()
{
final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
@ -1019,12 +1019,12 @@ public class LoadRuleTest
return mockPeon2;
}
private static ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean maintenance)
private static ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean isDecommissioning)
{
return new ServerHolder(
createServer(tier).toImmutableDruidServer(),
mockPeon1,
maintenance
isDecommissioning
);
}
}

View File

@ -50,8 +50,8 @@ public class CoordinatorDynamicConfigTest
+ " \"emitBalancingStats\": true,\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
+ " \"historicalNodesInMaintenance\": [\"host1\", \"host2\"],\n"
+ " \"nodesInMaintenancePriority\": 9\n"
+ " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
+ " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9\n"
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
@ -63,19 +63,19 @@ public class CoordinatorDynamicConfigTest
),
CoordinatorDynamicConfig.class
);
ImmutableSet<String> maintenance = ImmutableSet.of("host1", "host2");
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, maintenance, 9);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9);
actual = CoordinatorDynamicConfig.builder().withMaintenanceList(ImmutableSet.of("host1")).build(actual);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9);
actual = CoordinatorDynamicConfig.builder().withMaintenanceModeSegmentsPriority(5).build(actual);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5);
}
@Test
public void testMaintenanceParametersBackwardCompatibility() throws Exception
public void testDecommissioningParametersBackwardCompatibility() throws Exception
{
String jsonStr = "{\n"
+ " \"millisToWaitBeforeDeleting\": 1,\n"
@ -99,14 +99,14 @@ public class CoordinatorDynamicConfigTest
),
CoordinatorDynamicConfig.class
);
ImmutableSet<String> maintenance = ImmutableSet.of();
ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, maintenance, 0);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0);
actual = CoordinatorDynamicConfig.builder().withMaintenanceList(ImmutableSet.of("host1")).build(actual);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0);
actual = CoordinatorDynamicConfig.builder().withMaintenanceModeSegmentsPriority(5).build(actual);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5);
}
@ -217,7 +217,7 @@ public class CoordinatorDynamicConfigTest
{
CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build();
ImmutableSet<String> emptyList = ImmutableSet.of();
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 7);
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70);
}
@Test
@ -257,8 +257,8 @@ public class CoordinatorDynamicConfigTest
Set<String> expectedKillableDatasources,
boolean expectedKillAllDataSources,
int expectedMaxSegmentsInNodeLoadingQueue,
Set<String> maintenanceList,
int maintenancePriority
Set<String> decommissioning,
int decommissioningMaxPercentOfMaxSegmentsToMove
)
{
Assert.assertEquals(expectedMillisToWaitBeforeDeleting, config.getMillisToWaitBeforeDeleting());
@ -272,7 +272,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(expectedKillableDatasources, config.getKillableDataSources());
Assert.assertEquals(expectedKillAllDataSources, config.isKillAllDataSources());
Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue());
Assert.assertEquals(maintenanceList, config.getHistoricalNodesInMaintenance());
Assert.assertEquals(maintenancePriority, config.getNodesInMaintenancePriority());
Assert.assertEquals(decommissioning, config.getDecommissioningNodes());
Assert.assertEquals(decommissioningMaxPercentOfMaxSegmentsToMove, config.getDecommissioningMaxPercentOfMaxSegmentsToMove());
}
}