Coordinator fix balancer stuck (#5987)

* this will fix it

* filter destinations to not consider servers already serving segment

* fix it

* cleanup

* fix opposite day in ImmutableDruidServer.equals

* simplify
This commit is contained in:
Clint Wylie 2018-07-11 20:19:11 -07:00 committed by Jonathan Wei
parent 5f78a333ad
commit 31c2179fe1
2 changed files with 42 additions and 18 deletions

View File

@ -142,11 +142,7 @@ public class ImmutableDruidServer
ImmutableDruidServer that = (ImmutableDruidServer) o; ImmutableDruidServer that = (ImmutableDruidServer) o;
if (metadata.equals(that.metadata)) { return metadata.equals(that.metadata);
return false;
}
return true;
} }
@Override @Override

View File

@ -93,9 +93,11 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
CoordinatorStats stats CoordinatorStats stats
) )
{ {
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
if (params.getAvailableSegments().size() == 0) {
log.info("Metadata segments are not available. Cannot balance.");
return;
}
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) { if (!currentlyMovingSegments.get(tier).isEmpty()) {
@ -117,33 +119,59 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
numSegments += sourceHolder.getServer().getSegments().size(); numSegments += sourceHolder.getServer().getSegments().size();
} }
if (numSegments == 0) { if (numSegments == 0) {
log.info("No segments found. Cannot balance."); log.info("No segments found. Cannot balance.");
return; return;
} }
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L; long unmoved = 0L;
for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final List<ServerHolder> toMoveToWithLoadQueueCapacity = final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
// we want to leave the server the segment is currently on in the list...
// but filter out replicas that are already serving the segment, and servers with a full load queue
final List<ServerHolder> toMoveToWithLoadQueueCapacityAndNotServingSegment =
toMoveTo.stream() toMoveTo.stream()
.filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) .filter(s -> s.getServer().equals(fromServer) ||
(!s.isServingSegment(segmentToMove) &&
(maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)))
.collect(Collectors.toList()); .collect(Collectors.toList());
final ServerHolder destinationHolder = if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment);
if (destinationHolder != null) { if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) {
moveSegment(segmentToMove, destinationHolder.getServer(), params); moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params);
moved++; moved++;
} else {
log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier());
unmoved++;
}
} else { } else {
log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); log.info(
"No valid movement destinations for segment [%s].",
segmentToMove.getIdentifier()
);
unmoved++; unmoved++;
} }
} }
if (iter >= maxIterations) {
log.info(
"Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.",
(maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
);
break;
}
} }
if (unmoved == maxSegmentsToMove) { if (unmoved == maxSegmentsToMove) {