diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 15f13438163..10612d7eb7e 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** */ @@ -123,22 +124,28 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int iter = 0; iter < maxSegmentsToMove; iter++) { - if (maxToLoad > 0) { - toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad); - } + for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) { final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo); + final List toMoveToWithLoadQueueCapacity = + toMoveTo.stream() + .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) + .collect(Collectors.toList()); + + final ServerHolder destinationHolder = + strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); if (destinationHolder != null) { moveSegment(segmentToMove, destinationHolder.getServer(), params); + moved++; } else { - ++unmoved; + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + unmoved++; } } } + if (unmoved == maxSegmentsToMove) { // Cluster should be alive and constantly adjusting log.info("No good moves found in tier [%s]", tier);