From 9f405c0b1e77567814b3b8105c6941e1aa5ba68e Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Thu, 7 Mar 2013 15:06:12 -0800 Subject: [PATCH] fix constant segment balancing and separate out the differences between segment assignment and balancing --- .../druid/master/BalancerCostAnalyzer.java | 76 ++++++++++++++++--- .../druid/master/DruidMasterBalancer.java | 2 +- .../metamx/druid/master/rules/LoadRule.java | 2 +- 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 6519785e76b..a479d33f2df 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -20,11 +20,14 @@ package com.metamx.druid.master; import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Comparator; import java.util.List; import java.util.Random; @@ -185,27 +188,79 @@ public class BalancerCostAnalyzer } /** - * The assignment application requires us to supply a proposal segment. + * For balancing, we want to only make a move if the minimum cost server is not already serving the segment. * * @param proposalSegment A DataSegment that we are proposing to move. * @param serverHolders An iterable of ServerHolders for a particular tier. * * @return A ServerHolder with the new home for a segment. */ - public ServerHolder findNewSegmentHome( + public ServerHolder findNewSegmentHomeBalance( final DataSegment proposalSegment, final Iterable serverHolders ) { + MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); + if (costsAndServers.isEmpty()) { + return null; + } + + ServerHolder toServer = costsAndServers.pollFirst().rhs; + if (!toServer.isServingSegment(proposalSegment)) { + return toServer; + } + + return null; + } + + /** + * For assigment, we want to move to the lowest cost server that isn't already serving the segment. + * + * @param proposalSegment A DataSegment that we are proposing to move. + * @param serverHolders An iterable of ServerHolders for a particular tier. + * + * @return A ServerHolder with the new home for a segment. + */ + public ServerHolder findNewSegmentHomeAssign( + final DataSegment proposalSegment, + final Iterable serverHolders + ) + { + MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); + while (!costsAndServers.isEmpty()) { + ServerHolder toServer = costsAndServers.pollFirst().rhs; + if (!toServer.isServingSegment(proposalSegment)) { + return toServer; + } + } + + return null; + } + + private MinMaxPriorityQueue> computeCosts( + final DataSegment proposalSegment, + final Iterable serverHolders + ) + { + MinMaxPriorityQueue> costsAndServers = MinMaxPriorityQueue.orderedBy( + new Comparator>() + { + @Override + public int compare( + Pair o, + Pair o1 + ) + { + return Double.compare(o.lhs, o1.lhs); + } + } + ).create(); + final long proposalSegmentSize = proposalSegment.getSize(); - double minCost = Double.MAX_VALUE; - ServerHolder toServer = null; for (ServerHolder server : serverHolders) { /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() || - server.isLoadingSegment(proposalSegment) || - server.isServingSegment(proposalSegment)) { + if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { continue; } @@ -222,13 +277,10 @@ public class BalancerCostAnalyzer cost += computeJointSegmentCosts(proposalSegment, segment); } - if (cost < minCost) { - minCost = cost; - toServer = server; - } + costsAndServers.add(Pair.of(cost, server)); } - return toServer; + return costsAndServers; } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 7678e85624d..aa0025749c5 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -117,7 +117,7 @@ public class DruidMasterBalancer implements DruidMasterHelper while (iter < maxSegmentsToMove) { iter++; final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); - final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); + final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); if (holder == null) { continue; } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 4cd6acae640..eb06f806b7b 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -91,7 +91,7 @@ public abstract class LoadRule implements Rule final MasterStats stats = new MasterStats(); while (totalReplicants < expectedReplicants) { - final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); + final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList); if (holder == null) { log.warn(