diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index d57a3e2d841..cfd99c142c8 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -20,11 +20,14 @@ package com.metamx.druid.master; import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Comparator; import java.util.List; import java.util.Random; @@ -188,13 +191,26 @@ public class BalancerCostAnalyzer * A DataSegment that we are proposing to move. * @param serverHolders * An iterable of ServerHolders for a particular tier. - * @return A ServerHolder with the new home for a segment. + * @return A MinMaxPriorityQueue of costs of putting the proposalSegment on the server and ServerHolders. */ - public ServerHolder findNewSegmentHome(DataSegment proposalSegment, Iterable serverHolders) + public MinMaxPriorityQueue> findNewSegmentHome(DataSegment proposalSegment, Iterable serverHolders) { + // Just need a regular priority queue for the min. element. + final MinMaxPriorityQueue> costServerPairs = MinMaxPriorityQueue.orderedBy( + new Comparator>() + { + @Override + public int compare( + Pair o, + Pair o1 + ) + { + return Double.compare(o.lhs, o1.lhs); + } + } + ).create(); + final long proposalSegmentSize = proposalSegment.getSize(); - double minCost = Double.MAX_VALUE; - ServerHolder toServer = null; for (ServerHolder server : serverHolders) { /** Only calculate costs if the server has enough space. */ @@ -215,13 +231,10 @@ public class BalancerCostAnalyzer cost += computeJointSegmentCosts(proposalSegment, segment); } - if (cost < minCost) { - minCost = cost; - toServer = server; - } + costServerPairs.add(Pair.of(cost, server)); } - return toServer; + return costServerPairs; } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index fa04d93a34a..d766f2d663f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -110,7 +110,7 @@ public class DruidMasterBalancer implements DruidMasterHelper while (iter < maxSegmentsToMove) { BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); - DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).getServer(); + DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).pollFirst().rhs.getServer(); if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) { moveSegment(segmentToMove, toServer, params); } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index d33da8286e2..7c26c8b2d6d 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -21,6 +21,7 @@ package com.metamx.druid.master.rules; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.common.Pair; import com.metamx.druid.client.DataSegment; import com.metamx.druid.master.BalancerCostAnalyzer; import com.metamx.druid.master.DruidMaster; @@ -58,14 +59,18 @@ public abstract class LoadRule implements Rule return stats; } + final List serverHolderList = new ArrayList(serverQueue); + final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); + final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); + MinMaxPriorityQueue> serverCostQueue = analyzer.findNewSegmentHome(segment, serverHolderList); + stats.accumulate( assign( params.getReplicationManager(), expectedReplicants, totalReplicants, - serverQueue, - segment, - params + serverCostQueue, + segment ) ); @@ -78,40 +83,32 @@ public abstract class LoadRule implements Rule final ReplicationThrottler replicationManager, int expectedReplicants, int totalReplicants, - MinMaxPriorityQueue serverQueue, - final DataSegment segment, - final DruidMasterRuntimeParams params + MinMaxPriorityQueue> serverQueue, + final DataSegment segment ) { MasterStats stats = new MasterStats(); - List serverHolderList = new ArrayList(serverQueue); - - List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); - ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); + ServerHolder holder = serverQueue.pollFirst().rhs; if (holder == null) { log.warn( - "Not enough %s servers[%d] or node capacity to assign segment[%s]! Expected Replicants[%d]", + "Not enough %s servers or node capacity to assign segment[%s]! Expected Replicants[%d]", getTier(), - assignedServers.size() + serverQueue.size() + 1, segment.getIdentifier(), expectedReplicants ); break; } + if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) { - assignedServers.add(holder); continue; } if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster if (!replicationManager.canAddReplicant(getTier()) || !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { - serverQueue.add(holder); break; } } @@ -127,12 +124,10 @@ public abstract class LoadRule implements Rule } } ); - assignedServers.add(holder); stats.addToTieredStat("assignedCount", getTier(), 1); ++totalReplicants; } - serverQueue.addAll(assignedServers); return stats; } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java index f40d16d93ff..60429dbcae1 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java @@ -528,6 +528,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -601,6 +602,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -847,6 +849,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -1032,6 +1035,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(longerAvailableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params);