From 2d7113b263322c68ca5adca19ca17e01afcc7be6 Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Mon, 21 Jan 2013 14:28:25 -0800 Subject: [PATCH] huge simplification of balancing code --- .../druid/master/BalancerCostAnalyzer.java | 256 +++--------------- .../druid/master/DruidMasterBalancer.java | 122 +++++---- .../metamx/druid/master/rules/LoadRule.java | 28 +- .../druid/master/DruidMasterBalancerTest.java | 2 - 4 files changed, 107 insertions(+), 301 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 8a6f2c17e28..56563cb1521 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -20,19 +20,13 @@ package com.metamx.druid.master; import com.google.common.collect.Lists; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; -import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.client.DruidServer; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.util.Comparator; import java.util.List; import java.util.Random; -import java.util.Set; /** @@ -48,56 +42,18 @@ public class BalancerCostAnalyzer private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; private final Random rand; private final DateTime referenceTimestamp; - private int maxSegmentsToMove; - private List serverHolderList; - private double initialTotalCost; - private double normalization; - private double totalCostChange; - private int totalSegments; public BalancerCostAnalyzer(DateTime referenceTimestamp) { this.referenceTimestamp = referenceTimestamp; - rand = new Random(0); - totalCostChange = 0; - } - - // The assignment usage doesn't require initialization. - public void init(List serverHolderList, DruidMasterRuntimeParams params) - { - this.serverHolderList = serverHolderList; - this.initialTotalCost = calculateInitialTotalCost(serverHolderList); - this.normalization = calculateNormalization(serverHolderList); - this.totalSegments = params.getAvailableSegments().size(); - this.maxSegmentsToMove = params.getMaxSegmentsToMove(); - } - - public double getInitialTotalCost() - { - return initialTotalCost; - } - - public double getNormalization() - { - return normalization; - } - - public double getNormalizedInitialCost() - { - return initialTotalCost / normalization; - } - - public double getTotalCostChange() - { - return totalCostChange; } /* * Calculates the cost normalization. This is such that the normalized cost is lower bounded * by 1 (e.g. when each segment gets its own compute node). */ - private double calculateNormalization(List serverHolderList) + public double calculateNormalization(List serverHolderList) { double cost = 0; for (ServerHolder server : serverHolderList) { @@ -109,7 +65,7 @@ public class BalancerCostAnalyzer } // Calculates the initial cost of the Druid segment configuration. - private double calculateInitialTotalCost(List serverHolderList) + public double calculateInitialTotalCost(List serverHolderList) { double cost = 0; for (ServerHolder server : serverHolderList) { @@ -170,9 +126,9 @@ public class BalancerCostAnalyzer /* * Sample from each server with probability proportional to the number of segments on that server. */ - private ServerHolder sampleServer() + private ServerHolder sampleServer(List serverHolderList, int numSegments) { - final int num = rand.nextInt(totalSegments); + final int num = rand.nextInt(numSegments); int cumulativeSegments = 0; int numToStopAt = 0; @@ -184,193 +140,57 @@ public class BalancerCostAnalyzer return serverHolderList.get(numToStopAt - 1); } - public Set findSegmentsToMove() + // The balancing application requires us to pick a proposal segment. + public BalancerSegmentHolder findNewSegmentHome(List serverHolders, int numSegments) { - final Set segmentHoldersToMove = Sets.newHashSet(); - final Set movingSegments = Sets.newHashSet(); - if (serverHolderList.isEmpty()) { - return segmentHoldersToMove; - } + // We want to sample from each server w.p. numSegmentsOnServer / totalSegments + ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments); - int counter = 0; + // and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer + // so that the probability of picking a segment is 1 / totalSegments. + List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); - while (segmentHoldersToMove.size() < maxSegmentsToMove && counter < 3 * maxSegmentsToMove) { - counter++; + DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); + ServerHolder toServer = findNewSegmentHome(proposalSegment, serverHolders); - // We want to sample from each server w.p. numSegmentsOnServer / totalSegments - ServerHolder fromServerHolder = sampleServer(); - - // and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer - // so that the probability of picking a segment is 1 / totalSegments. - List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); - - if (segments.isEmpty()) { - continue; - } - - DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); - if (movingSegments.contains(proposalSegment)) { - continue; - } - - BalancerCostComputer helper = new BalancerCostComputer( - serverHolderList, - proposalSegment, - fromServerHolder, - segmentHoldersToMove - ); - - Pair minPair = helper.getMinPair(); - - if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) { - movingSegments.add(proposalSegment); - segmentHoldersToMove.add( - new BalancerSegmentHolder( - fromServerHolder.getServer(), - minPair.rhs.getServer(), - proposalSegment - ) - ); - totalCostChange += helper.getCurrCost() - minPair.lhs; - } - } - - return segmentHoldersToMove; + return new BalancerSegmentHolder(fromServerHolder.getServer(), toServer.getServer(), proposalSegment); } - /* - * These could be anonymous in BalancerCostComputer - * Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned - * does not have a source server. - */ - public static class NullServerHolder extends ServerHolder + // The assignment application requires us to supply a proposal segment. + public ServerHolder findNewSegmentHome(DataSegment proposalSegment, Iterable serverHolders) { - public NullServerHolder() - { - super(null, null); - } + final long proposalSegmentSize = proposalSegment.getSize(); + double minCost = Double.MAX_VALUE; + ServerHolder toServer = null; - @Override - public DruidServer getServer() - { - return new NullDruidServer(); - } - - public static class NullDruidServer extends DruidServer - { - public NullDruidServer() - { - super(null, null, 0, null, null); + for (ServerHolder server : serverHolders) { + // Only calculate costs if the server has enough space. + if (proposalSegmentSize > server.getAvailableSize()) { + break; } - @Override - public boolean equals(Object o) - { - return false; - } - } - } - - public class BalancerCostComputer - { - private final List serverHolderList; - private final DataSegment proposalSegment; - private final ServerHolder fromServerHolder; - private final Set segmentHoldersToMove; - private Pair minPair; - private double currCost; - - public BalancerCostComputer( - List serverHolderList, - DataSegment proposalSegment - ) - { - this(serverHolderList, proposalSegment, new NullServerHolder(), Sets.newHashSet()); - } - - public BalancerCostComputer( - List serverHolderList, - DataSegment proposalSegment, - ServerHolder fromServerHolder, - Set segmentHoldersToMove - ) - { - this.serverHolderList = serverHolderList; - this.proposalSegment = proposalSegment; - this.fromServerHolder = fromServerHolder; - this.segmentHoldersToMove = segmentHoldersToMove; - this.currCost = 0; - - computeAllCosts(); - } - - public Pair getMinPair() - { - return minPair; - } - - public double getCurrCost() - { - return currCost; - } - - public void computeAllCosts() - { - // Just need a regular priority queue for the min. element. - MinMaxPriorityQueue> costsServerHolderPairs = MinMaxPriorityQueue.orderedBy( - new Comparator>() - { - @Override - public int compare( - Pair o, - Pair o1 - ) - { - return Double.compare(o.lhs, o1.lhs); - } - } - ).create(); - - for (ServerHolder server : serverHolderList) { - // Only calculate costs if the server has enough space. - if (proposalSegment.getSize() > server.getAvailableSize()) { - break; - } - - // The contribution to the total cost of a given server by proposing to move the segment to that server is... - double cost = 0f; - // the sum of the costs of other (inclusive) segments on the server - for (DataSegment segment : server.getServer().getSegments().values()) { + // The contribution to the total cost of a given server by proposing to move the segment to that server is... + double cost = 0f; + // the sum of the costs of other (exclusive of the proposalSegment) segments on the server + for (DataSegment segment : server.getServer().getSegments().values()) { + if (!proposalSegment.equals(segment)) { cost += computeJointSegmentCosts(proposalSegment, segment); } - - // plus the self cost if the proposed new server is different - if (!fromServerHolder.getServer().equals(server.getServer())) { - cost += computeJointSegmentCosts(proposalSegment, proposalSegment); - } - - // plus the costs of segments that will be moved. - for (BalancerSegmentHolder segmentToMove : segmentHoldersToMove) { - if (server.getServer().equals(segmentToMove.getToServer())) { - cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment()); - } - if (server.getServer().equals(segmentToMove.getFromServer())) { - cost -= computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment()); - } - } - - // currCost keeps track of the current cost for that server (so we can compute the cost change). - if (fromServerHolder.getServer().equals(server.getServer())) { - currCost = cost; - } - - costsServerHolderPairs.add(Pair.of(cost, server)); + } + // plus the costs of segments that will be loaded + for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { + cost += computeJointSegmentCosts(proposalSegment, segment); } - minPair = costsServerHolderPairs.pollFirst(); + if (cost < minCost) { + minCost = cost; + toServer = server; + } } + return toServer; } + } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index ac1ae7283c4..d63b1226212 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -22,7 +22,6 @@ package com.metamx.druid.master; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Sets; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; @@ -30,6 +29,7 @@ import com.metamx.emitter.EmittingLogger; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -81,7 +81,9 @@ public class DruidMasterBalancer implements DruidMasterHelper @Override public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) { - MasterStats stats = new MasterStats(); + final MasterStats stats = new MasterStats(); + final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); + final int maxSegmentsToMove = params.getMaxSegmentsToMove(); for (Map.Entry> entry : params.getDruidCluster().getCluster().entrySet()) { @@ -101,28 +103,42 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - List serverHolderList = new ArrayList(entry.getValue()); + final List serverHolderList = new ArrayList(entry.getValue()); + int numSegments = 0; + for (ServerHolder server : serverHolderList) { + numSegments += server.getServer().getSegments().size(); + } - BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); - analyzer.init(serverHolderList, params); - moveSegments(analyzer.findSegmentsToMove(), params); + if (numSegments == 0) { + log.info("No segments found. Cannot balance."); + continue; + } - double initialTotalCost = analyzer.getInitialTotalCost(); - double normalization = analyzer.getNormalization(); - double normalizedInitialCost = analyzer.getNormalizedInitialCost(); - double costChange = analyzer.getTotalCostChange(); + final Set segmentsBeingMoved = new HashSet(); + int iter = 0; + + while (iter < maxSegmentsToMove) { + BalancerSegmentHolder holder = analyzer.findNewSegmentHome(serverHolderList, numSegments); + if (!segmentsBeingMoved.contains(holder.getSegment())) { + moveSegment(holder, params); + segmentsBeingMoved.add(holder.getSegment()); + } + iter++; + } + + final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); + final double normalization = analyzer.calculateNormalization(serverHolderList); + final double normalizedInitialCost = initialTotalCost / normalization; stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); stats.addToTieredStat("normalization", tier, (long) normalization); - stats.addToTieredStat("costChange", tier, (long) costChange); + stats.addToTieredStat("movedCount", tier, segmentsBeingMoved.size()); log.info( - "Initial Total Cost: [%f], Initial Normalized Cost: [%f], Cost Change: [%f], Normalized Cost Change: [%f], New Normalized Cost: [%f]", + "Initial Total Cost: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]", initialTotalCost, normalizedInitialCost, - costChange, - costChange / normalization, - (initialTotalCost - costChange) / normalization + segmentsBeingMoved.size() ); if (serverHolderList.size() <= 1) { @@ -133,7 +149,6 @@ public class DruidMasterBalancer implements DruidMasterHelper continue; } - stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); } return params.buildFromExisting() @@ -141,55 +156,52 @@ public class DruidMasterBalancer implements DruidMasterHelper .build(); } - private void moveSegments( - final Set segments, + private void moveSegment( + final BalancerSegmentHolder segment, final DruidMasterRuntimeParams params ) { + final DruidServer toServer = segment.getToServer(); + final String toServerName = segment.getToServer().getName(); + final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName); - for (final BalancerSegmentHolder segment : Sets.newHashSet(segments)) { - final DruidServer toServer = segment.getToServer(); - final String toServerName = segment.getToServer().getName(); - LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName); + final String fromServer = segment.getFromServer().getName(); + final DataSegment segmentToMove = segment.getSegment(); + final String segmentName = segmentToMove.getIdentifier(); - String fromServer = segment.getFromServer().getName(); - DataSegment segmentToMove = segment.getSegment(); - final String segmentName = segmentToMove.getIdentifier(); - - if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && - (toServer.getSegment(segmentName) == null) && - new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) { - log.info( - "Moving [%s] from [%s] to [%s]", - segmentName, + if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && + (toServer.getSegment(segmentName) == null) && + new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) { + log.info( + "Moving [%s] from [%s] to [%s]", + segmentName, + fromServer, + toServerName + ); + try { + master.moveSegment( fromServer, - toServerName - ); - try { - master.moveSegment( - fromServer, - toServerName, - segmentToMove.getIdentifier(), - new LoadPeonCallback() + toServerName, + segmentToMove.getIdentifier(), + new LoadPeonCallback() + { + @Override + protected void execute() { - @Override - protected void execute() - { - Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); - if (movingSegments != null) { - movingSegments.remove(segmentName); - } + Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); + if (movingSegments != null) { + movingSegments.remove(segmentName); } } - ); - currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment); - } - catch (Exception e) { - log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit(); - } - } else { - currentlyMovingSegments.get(toServer.getTier()).remove(segment); + } + ); + currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment); } + catch (Exception e) { + log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit(); + } + } else { + currentlyMovingSegments.get(toServer.getTier()).remove(segment); } } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index c18d7d8db87..c77621b4f14 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -87,14 +87,11 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); - BalancerCostAnalyzer.BalancerCostComputer helper = analyzer.new BalancerCostComputer(serverHolderList, segment); + ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); - Pair minPair = helper.getMinPair(); - - ServerHolder holder = minPair.rhs; if (holder == null) { log.warn( - "Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]", + "Not enough %s servers[%d] or node capacity to assign segment[%s]! Expected Replicants[%d]", getTier(), assignedServers.size() + serverQueue.size() + 1, segment.getIdentifier(), @@ -107,27 +104,6 @@ public abstract class LoadRule implements Rule continue; } - if (holder.getAvailableSize() < segment.getSize()) { - log.warn( - "Not enough node capacity, closest is [%s] with %,d available, skipping segment[%s].", - holder.getServer(), - holder.getAvailableSize(), - segment - ); - log.makeAlert( - "Not enough node capacity", - ImmutableMap.builder() - .put("segmentSkipped", segment.toString()) - .put("closestNode", holder.getServer().toString()) - .put("availableSize", holder.getAvailableSize()) - .build() - ).emit(); - serverQueue.add(holder); - stats.addToTieredStat("unassignedCount", getTier(), 1); - stats.addToTieredStat("unassignedSize", getTier(), segment.getSize()); - break; - } - holder.getPeon().loadSegment( segment, new LoadPeonCallback() diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 4e677acbae1..ad29249770a 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -229,7 +229,6 @@ public class DruidMasterBalancerTest params = new DruidMasterBalancer(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); } @Test @@ -379,6 +378,5 @@ public class DruidMasterBalancerTest params = new DruidMasterBalancer(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); } }