diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 7a387460a9b..ab319fae72e 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -32,6 +32,7 @@ import org.joda.time.Interval; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -103,9 +104,8 @@ public class BalancerCostAnalyzer { double cost = 0; for (ServerHolder server : serverHolderList) { - DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); - for (int i = 0; i < segments.length; ++i) { - cost += computeJointSegmentCosts(segments[i], segments[i]); + for (DataSegment segment : server.getServer().getSegments().values()) { + cost += computeJointSegmentCosts(segment, segment); } } return cost; @@ -176,14 +176,14 @@ public class BalancerCostAnalyzer * Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned * does not have a source server. */ - public class NullServerHolder extends ServerHolder + public static class NullServerHolder extends ServerHolder { public NullServerHolder() { super(null, null); } - public class NullDruidServer extends DruidServer + public static class NullDruidServer extends DruidServer { public NullDruidServer() { @@ -206,36 +206,17 @@ public class BalancerCostAnalyzer public class BalancerCostAnalyzerHelper { - private MinMaxPriorityQueue> costsServerHolderPairs; - private List serverHolderList; - private DataSegment proposalSegment; - private ServerHolder fromServerHolder; - private Set segmentHoldersToMove; + private final List serverHolderList; + private final DataSegment proposalSegment; + private final ServerHolder fromServerHolder; + private final Set segmentHoldersToMove; + + private Pair minPair; private double currCost; - public MinMaxPriorityQueue> getCostsServerHolderPairs() + public Pair getMinPair() { - return costsServerHolderPairs; - } - - public List getServerHolderList() - { - return serverHolderList; - } - - public DataSegment getProposalSegment() - { - return proposalSegment; - } - - public ServerHolder getFromServerHolder() - { - return fromServerHolder; - } - - public Set getSegmentHoldersToMove() - { - return segmentHoldersToMove; + return minPair; } public double getCurrCost() @@ -257,9 +238,20 @@ public class BalancerCostAnalyzer ServerHolder fromServerHolder, Set segmentHoldersToMove ) + { + this.serverHolderList = serverHolderList; + this.proposalSegment = proposalSegment; + this.fromServerHolder = fromServerHolder; + this.segmentHoldersToMove = segmentHoldersToMove; + this.currCost = 0; + + computeAllCosts(); + } + + public void computeAllCosts() { // Just need a regular priority queue for the min. element. - this.costsServerHolderPairs = MinMaxPriorityQueue.orderedBy( + MinMaxPriorityQueue> costsServerHolderPairs = MinMaxPriorityQueue.orderedBy( new Comparator>() { @Override @@ -272,17 +264,12 @@ public class BalancerCostAnalyzer } } ).create(); - this.serverHolderList = serverHolderList; - this.proposalSegment = proposalSegment; - this.fromServerHolder = fromServerHolder; - this.segmentHoldersToMove = segmentHoldersToMove; - this.currCost = 0; - } - public void computeAllCosts() - { - // The contribution to the total cost of a given server by proposing to move the segment to that server is... for (ServerHolder server : serverHolderList) { + // Only calculate costs if the server has enough space. + if (proposalSegment.getSize() > server.getAvailableSize()) break; + + // The contribution to the total cost of a given server by proposing to move the segment to that server is... double cost = 0f; // the sum of the costs of other (inclusive) segments on the server for (DataSegment segment : server.getServer().getSegments().values()) { @@ -295,9 +282,7 @@ public class BalancerCostAnalyzer } // plus the costs of segments that will be moved. - Iterator it = segmentHoldersToMove.iterator(); - while (it.hasNext()) { - BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next(); + for (BalancerSegmentHolder segmentToMove : segmentHoldersToMove) { if (server.getServer().equals(segmentToMove.getToServer())) { cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment()); } @@ -311,12 +296,10 @@ public class BalancerCostAnalyzer currCost = cost; } - // Only enter the queue if the server has enough size. - if (proposalSegment.getSize() < server.getAvailableSize()) { - costsServerHolderPairs.add(Pair.of(cost, server)); - } - + costsServerHolderPairs.add(Pair.of(cost, server)); } + + minPair = costsServerHolderPairs.pollFirst(); } } @@ -342,13 +325,13 @@ public class BalancerCostAnalyzer { Set segmentHoldersToMove = Sets.newHashSet(); Set movingSegments = Sets.newHashSet(); + int numServers = serverHolderList.size(); int counter = 0; while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) { counter++; - int numServers = serverHolderList.size(); if (numServers == 0) break; // We want to sample from each server w.p. numSegmentsOnServer / totalSegments @@ -358,7 +341,7 @@ public class BalancerCostAnalyzer // so that the probability of picking a segment is 1 / totalSegments. List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); - if (segments.size() == 0) continue; + if (segments.isEmpty()) continue; DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); if (movingSegments.contains(proposalSegment)) { @@ -371,9 +354,8 @@ public class BalancerCostAnalyzer fromServerHolder, segmentHoldersToMove ); - helper.computeAllCosts(); - Pair minPair = helper.getCostsServerHolderPairs().pollFirst(); + Pair minPair = helper.getMinPair(); if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) { movingSegments.add(proposalSegment); diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 75cc76d7c7a..6801c4a3945 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -89,8 +89,8 @@ public abstract class LoadRule implements Rule while (totalReplicants < expectedReplicants) { BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now()); BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment); - helper.computeAllCosts(); - Pair minPair = helper.getCostsServerHolderPairs().pollFirst(); + + Pair minPair = helper.getMinPair(); ServerHolder holder = minPair.rhs; if (holder == null) {