diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 6519785e76b..a479d33f2df 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -20,11 +20,14 @@ package com.metamx.druid.master; import com.google.common.collect.Lists; +import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Comparator; import java.util.List; import java.util.Random; @@ -185,27 +188,79 @@ public class BalancerCostAnalyzer } /** - * The assignment application requires us to supply a proposal segment. + * For balancing, we want to only make a move if the minimum cost server is not already serving the segment. * * @param proposalSegment A DataSegment that we are proposing to move. * @param serverHolders An iterable of ServerHolders for a particular tier. * * @return A ServerHolder with the new home for a segment. */ - public ServerHolder findNewSegmentHome( + public ServerHolder findNewSegmentHomeBalance( final DataSegment proposalSegment, final Iterable serverHolders ) { + MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); + if (costsAndServers.isEmpty()) { + return null; + } + + ServerHolder toServer = costsAndServers.pollFirst().rhs; + if (!toServer.isServingSegment(proposalSegment)) { + return toServer; + } + + return null; + } + + /** + * For assigment, we want to move to the lowest cost server that isn't already serving the segment. + * + * @param proposalSegment A DataSegment that we are proposing to move. + * @param serverHolders An iterable of ServerHolders for a particular tier. + * + * @return A ServerHolder with the new home for a segment. + */ + public ServerHolder findNewSegmentHomeAssign( + final DataSegment proposalSegment, + final Iterable serverHolders + ) + { + MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); + while (!costsAndServers.isEmpty()) { + ServerHolder toServer = costsAndServers.pollFirst().rhs; + if (!toServer.isServingSegment(proposalSegment)) { + return toServer; + } + } + + return null; + } + + private MinMaxPriorityQueue> computeCosts( + final DataSegment proposalSegment, + final Iterable serverHolders + ) + { + MinMaxPriorityQueue> costsAndServers = MinMaxPriorityQueue.orderedBy( + new Comparator>() + { + @Override + public int compare( + Pair o, + Pair o1 + ) + { + return Double.compare(o.lhs, o1.lhs); + } + } + ).create(); + final long proposalSegmentSize = proposalSegment.getSize(); - double minCost = Double.MAX_VALUE; - ServerHolder toServer = null; for (ServerHolder server : serverHolders) { /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() || - server.isLoadingSegment(proposalSegment) || - server.isServingSegment(proposalSegment)) { + if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { continue; } @@ -222,13 +277,10 @@ public class BalancerCostAnalyzer cost += computeJointSegmentCosts(proposalSegment, segment); } - if (cost < minCost) { - minCost = cost; - toServer = server; - } + costsAndServers.add(Pair.of(cost, server)); } - return toServer; + return costsAndServers; } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 7678e85624d..aa0025749c5 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -117,7 +117,7 @@ public class DruidMasterBalancer implements DruidMasterHelper while (iter < maxSegmentsToMove) { iter++; final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); - final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); + final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); if (holder == null) { continue; } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index a4693150ea5..38068600637 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -55,30 +55,6 @@ public class DruidMasterLogger implements DruidMasterHelper } } - Map unassigned = stats.getPerTierStats().get("unassignedCount"); - if (unassigned != null) { - for (Map.Entry entry : unassigned.entrySet()) { - emitter.emit( - new ServiceMetricEvent.Builder().build( - String.format("master/%s/unassigned/count", entry.getKey()), - entry.getValue().get() - ) - ); - } - } - - Map sizes = stats.getPerTierStats().get("unassignedSize"); - if (sizes != null) { - for (Map.Entry entry : sizes.entrySet()) { - emitter.emit( - new ServiceMetricEvent.Builder().build( - String.format("master/%s/unassigned/size", entry.getKey()), - entry.getValue().get() - ) - ); - } - } - Map dropped = stats.getPerTierStats().get("droppedCount"); if (dropped != null) { for (Map.Entry entry : dropped.entrySet()) { @@ -89,6 +65,30 @@ public class DruidMasterLogger implements DruidMasterHelper } } + emitter.emit( + new ServiceMetricEvent.Builder().build( + "master/cost/raw", stats.getGlobalStats().get("initialCost") + ) + ); + + emitter.emit( + new ServiceMetricEvent.Builder().build( + "master/cost/normalization", stats.getGlobalStats().get("normalization") + ) + ); + + emitter.emit( + new ServiceMetricEvent.Builder().build( + "master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d + ) + ); + + emitter.emit( + new ServiceMetricEvent.Builder().build( + "master/moved/count", stats.getGlobalStats().get("movedCount") + ) + ); + emitter.emit( new ServiceMetricEvent.Builder().build( "master/deleted/count", stats.getGlobalStats().get("deletedCount") diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 4cd6acae640..eb06f806b7b 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -91,7 +91,7 @@ public abstract class LoadRule implements Rule final MasterStats stats = new MasterStats(); while (totalReplicants < expectedReplicants) { - final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); + final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList); if (holder == null) { log.warn(