Coordinator fix balance to try to move max segments instead of up to max segments (#5927)

* fix move to try to move max segments instead of "up to" max segments

* fix

* fix oops
This commit is contained in:
Clint Wylie 2018-07-03 17:06:38 -07:00 committed by Gian Merlino
parent 1ccabab98e
commit d5a3871864
1 changed files with 13 additions and 6 deletions

View File

@ -40,6 +40,7 @@ import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/** /**
*/ */
@ -123,22 +124,28 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L; long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) { for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
if (maxToLoad > 0) {
toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
}
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo); final List<ServerHolder> toMoveToWithLoadQueueCapacity =
toMoveTo.stream()
.filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)
.collect(Collectors.toList());
final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity);
if (destinationHolder != null) { if (destinationHolder != null) {
moveSegment(segmentToMove, destinationHolder.getServer(), params); moveSegment(segmentToMove, destinationHolder.getServer(), params);
moved++;
} else { } else {
++unmoved; log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier());
unmoved++;
} }
} }
} }
if (unmoved == maxSegmentsToMove) { if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting // Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier); log.info("No good moves found in tier [%s]", tier);