From d5a38718643ad2bd45339acf42046e058b1ad3fd Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 3 Jul 2018 17:06:38 -0700 Subject: [PATCH] Coordinator fix balance to try to move max segments instead of up to max segments (#5927) * fix move to try to move max segments instead of "up to" max segments * fix * fix oops --- .../helper/DruidCoordinatorBalancer.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 15f13438163..10612d7eb7e 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** */ @@ -123,22 +124,28 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int iter = 0; iter < maxSegmentsToMove; iter++) { - if (maxToLoad > 0) { - toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad); - } + for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) { final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo); + final List toMoveToWithLoadQueueCapacity = + toMoveTo.stream() + .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) + .collect(Collectors.toList()); + + final ServerHolder destinationHolder = + strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); if (destinationHolder != null) { moveSegment(segmentToMove, destinationHolder.getServer(), params); + moved++; } else { - ++unmoved; + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + unmoved++; } } } + if (unmoved == maxSegmentsToMove) { // Cluster should be alive and constantly adjusting log.info("No good moves found in tier [%s]", tier);