huge simplification of balancing code

This commit is contained in:
Nelson Ray 2013-01-21 14:28:25 -08:00
parent e847faf02f
commit 2d7113b263
4 changed files with 107 additions and 301 deletions

View File

@ -20,19 +20,13 @@
package com.metamx.druid.master; package com.metamx.druid.master;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
/** /**
@ -48,56 +42,18 @@ public class BalancerCostAnalyzer
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private final Random rand; private final Random rand;
private final DateTime referenceTimestamp; private final DateTime referenceTimestamp;
private int maxSegmentsToMove;
private List<ServerHolder> serverHolderList;
private double initialTotalCost;
private double normalization;
private double totalCostChange;
private int totalSegments;
public BalancerCostAnalyzer(DateTime referenceTimestamp) public BalancerCostAnalyzer(DateTime referenceTimestamp)
{ {
this.referenceTimestamp = referenceTimestamp; this.referenceTimestamp = referenceTimestamp;
rand = new Random(0); rand = new Random(0);
totalCostChange = 0;
}
// The assignment usage doesn't require initialization.
public void init(List<ServerHolder> serverHolderList, DruidMasterRuntimeParams params)
{
this.serverHolderList = serverHolderList;
this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
this.normalization = calculateNormalization(serverHolderList);
this.totalSegments = params.getAvailableSegments().size();
this.maxSegmentsToMove = params.getMaxSegmentsToMove();
}
public double getInitialTotalCost()
{
return initialTotalCost;
}
public double getNormalization()
{
return normalization;
}
public double getNormalizedInitialCost()
{
return initialTotalCost / normalization;
}
public double getTotalCostChange()
{
return totalCostChange;
} }
/* /*
* Calculates the cost normalization. This is such that the normalized cost is lower bounded * Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node). * by 1 (e.g. when each segment gets its own compute node).
*/ */
private double calculateNormalization(List<ServerHolder> serverHolderList) public double calculateNormalization(List<ServerHolder> serverHolderList)
{ {
double cost = 0; double cost = 0;
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
@ -109,7 +65,7 @@ public class BalancerCostAnalyzer
} }
// Calculates the initial cost of the Druid segment configuration. // Calculates the initial cost of the Druid segment configuration.
private double calculateInitialTotalCost(List<ServerHolder> serverHolderList) public double calculateInitialTotalCost(List<ServerHolder> serverHolderList)
{ {
double cost = 0; double cost = 0;
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
@ -170,9 +126,9 @@ public class BalancerCostAnalyzer
/* /*
* Sample from each server with probability proportional to the number of segments on that server. * Sample from each server with probability proportional to the number of segments on that server.
*/ */
private ServerHolder sampleServer() private ServerHolder sampleServer(List<ServerHolder> serverHolderList, int numSegments)
{ {
final int num = rand.nextInt(totalSegments); final int num = rand.nextInt(numSegments);
int cumulativeSegments = 0; int cumulativeSegments = 0;
int numToStopAt = 0; int numToStopAt = 0;
@ -184,193 +140,57 @@ public class BalancerCostAnalyzer
return serverHolderList.get(numToStopAt - 1); return serverHolderList.get(numToStopAt - 1);
} }
public Set<BalancerSegmentHolder> findSegmentsToMove() // The balancing application requires us to pick a proposal segment.
public BalancerSegmentHolder findNewSegmentHome(List<ServerHolder> serverHolders, int numSegments)
{ {
final Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet(); // We want to sample from each server w.p. numSegmentsOnServer / totalSegments
final Set<DataSegment> movingSegments = Sets.newHashSet(); ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments);
if (serverHolderList.isEmpty()) {
return segmentHoldersToMove;
}
int counter = 0; // and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
while (segmentHoldersToMove.size() < maxSegmentsToMove && counter < 3 * maxSegmentsToMove) { DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
counter++; ServerHolder toServer = findNewSegmentHome(proposalSegment, serverHolders);
// We want to sample from each server w.p. numSegmentsOnServer / totalSegments return new BalancerSegmentHolder(fromServerHolder.getServer(), toServer.getServer(), proposalSegment);
ServerHolder fromServerHolder = sampleServer();
// and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
if (segments.isEmpty()) {
continue;
}
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
if (movingSegments.contains(proposalSegment)) {
continue;
}
BalancerCostComputer helper = new BalancerCostComputer(
serverHolderList,
proposalSegment,
fromServerHolder,
segmentHoldersToMove
);
Pair<Double, ServerHolder> minPair = helper.getMinPair();
if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) {
movingSegments.add(proposalSegment);
segmentHoldersToMove.add(
new BalancerSegmentHolder(
fromServerHolder.getServer(),
minPair.rhs.getServer(),
proposalSegment
)
);
totalCostChange += helper.getCurrCost() - minPair.lhs;
}
}
return segmentHoldersToMove;
} }
/* // The assignment application requires us to supply a proposal segment.
* These could be anonymous in BalancerCostComputer public ServerHolder findNewSegmentHome(DataSegment proposalSegment, Iterable<ServerHolder> serverHolders)
* Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned
* does not have a source server.
*/
public static class NullServerHolder extends ServerHolder
{ {
public NullServerHolder() final long proposalSegmentSize = proposalSegment.getSize();
{ double minCost = Double.MAX_VALUE;
super(null, null); ServerHolder toServer = null;
}
@Override for (ServerHolder server : serverHolders) {
public DruidServer getServer() // Only calculate costs if the server has enough space.
{ if (proposalSegmentSize > server.getAvailableSize()) {
return new NullDruidServer(); break;
}
public static class NullDruidServer extends DruidServer
{
public NullDruidServer()
{
super(null, null, 0, null, null);
} }
@Override // The contribution to the total cost of a given server by proposing to move the segment to that server is...
public boolean equals(Object o) double cost = 0f;
{ // the sum of the costs of other (exclusive of the proposalSegment) segments on the server
return false; for (DataSegment segment : server.getServer().getSegments().values()) {
} if (!proposalSegment.equals(segment)) {
}
}
public class BalancerCostComputer
{
private final List<ServerHolder> serverHolderList;
private final DataSegment proposalSegment;
private final ServerHolder fromServerHolder;
private final Set<BalancerSegmentHolder> segmentHoldersToMove;
private Pair<Double, ServerHolder> minPair;
private double currCost;
public BalancerCostComputer(
List<ServerHolder> serverHolderList,
DataSegment proposalSegment
)
{
this(serverHolderList, proposalSegment, new NullServerHolder(), Sets.<BalancerSegmentHolder>newHashSet());
}
public BalancerCostComputer(
List<ServerHolder> serverHolderList,
DataSegment proposalSegment,
ServerHolder fromServerHolder,
Set<BalancerSegmentHolder> segmentHoldersToMove
)
{
this.serverHolderList = serverHolderList;
this.proposalSegment = proposalSegment;
this.fromServerHolder = fromServerHolder;
this.segmentHoldersToMove = segmentHoldersToMove;
this.currCost = 0;
computeAllCosts();
}
public Pair<Double, ServerHolder> getMinPair()
{
return minPair;
}
public double getCurrCost()
{
return currCost;
}
public void computeAllCosts()
{
// Just need a regular priority queue for the min. element.
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsServerHolderPairs = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
for (ServerHolder server : serverHolderList) {
// Only calculate costs if the server has enough space.
if (proposalSegment.getSize() > server.getAvailableSize()) {
break;
}
// The contribution to the total cost of a given server by proposing to move the segment to that server is...
double cost = 0f;
// the sum of the costs of other (inclusive) segments on the server
for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(proposalSegment, segment); cost += computeJointSegmentCosts(proposalSegment, segment);
} }
}
// plus the self cost if the proposed new server is different // plus the costs of segments that will be loaded
if (!fromServerHolder.getServer().equals(server.getServer())) { for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, proposalSegment); cost += computeJointSegmentCosts(proposalSegment, segment);
}
// plus the costs of segments that will be moved.
for (BalancerSegmentHolder segmentToMove : segmentHoldersToMove) {
if (server.getServer().equals(segmentToMove.getToServer())) {
cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
}
if (server.getServer().equals(segmentToMove.getFromServer())) {
cost -= computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
}
}
// currCost keeps track of the current cost for that server (so we can compute the cost change).
if (fromServerHolder.getServer().equals(server.getServer())) {
currCost = cost;
}
costsServerHolderPairs.add(Pair.of(cost, server));
} }
minPair = costsServerHolderPairs.pollFirst(); if (cost < minCost) {
minCost = cost;
toServer = server;
}
} }
return toServer;
} }
} }

View File

@ -22,7 +22,6 @@ package com.metamx.druid.master;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
@ -30,6 +29,7 @@ import com.metamx.emitter.EmittingLogger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -81,7 +81,9 @@ public class DruidMasterBalancer implements DruidMasterHelper
@Override @Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{ {
MasterStats stats = new MasterStats(); final MasterStats stats = new MasterStats();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
final int maxSegmentsToMove = params.getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry : for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getCluster().entrySet()) { params.getDruidCluster().getCluster().entrySet()) {
@ -101,28 +103,42 @@ public class DruidMasterBalancer implements DruidMasterHelper
continue; continue;
} }
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue()); final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue());
int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
}
BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); if (numSegments == 0) {
analyzer.init(serverHolderList, params); log.info("No segments found. Cannot balance.");
moveSegments(analyzer.findSegmentsToMove(), params); continue;
}
double initialTotalCost = analyzer.getInitialTotalCost(); final Set<DataSegment> segmentsBeingMoved = new HashSet<DataSegment>();
double normalization = analyzer.getNormalization(); int iter = 0;
double normalizedInitialCost = analyzer.getNormalizedInitialCost();
double costChange = analyzer.getTotalCostChange(); while (iter < maxSegmentsToMove) {
BalancerSegmentHolder holder = analyzer.findNewSegmentHome(serverHolderList, numSegments);
if (!segmentsBeingMoved.contains(holder.getSegment())) {
moveSegment(holder, params);
segmentsBeingMoved.add(holder.getSegment());
}
iter++;
}
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
final double normalization = analyzer.calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
stats.addToTieredStat("normalization", tier, (long) normalization); stats.addToTieredStat("normalization", tier, (long) normalization);
stats.addToTieredStat("costChange", tier, (long) costChange); stats.addToTieredStat("movedCount", tier, segmentsBeingMoved.size());
log.info( log.info(
"Initial Total Cost: [%f], Initial Normalized Cost: [%f], Cost Change: [%f], Normalized Cost Change: [%f], New Normalized Cost: [%f]", "Initial Total Cost: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
initialTotalCost, initialTotalCost,
normalizedInitialCost, normalizedInitialCost,
costChange, segmentsBeingMoved.size()
costChange / normalization,
(initialTotalCost - costChange) / normalization
); );
if (serverHolderList.size() <= 1) { if (serverHolderList.size() <= 1) {
@ -133,7 +149,6 @@ public class DruidMasterBalancer implements DruidMasterHelper
continue; continue;
} }
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
} }
return params.buildFromExisting() return params.buildFromExisting()
@ -141,55 +156,52 @@ public class DruidMasterBalancer implements DruidMasterHelper
.build(); .build();
} }
private void moveSegments( private void moveSegment(
final Set<BalancerSegmentHolder> segments, final BalancerSegmentHolder segment,
final DruidMasterRuntimeParams params final DruidMasterRuntimeParams params
) )
{ {
final DruidServer toServer = segment.getToServer();
final String toServerName = segment.getToServer().getName();
final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
for (final BalancerSegmentHolder segment : Sets.newHashSet(segments)) { final String fromServer = segment.getFromServer().getName();
final DruidServer toServer = segment.getToServer(); final DataSegment segmentToMove = segment.getSegment();
final String toServerName = segment.getToServer().getName(); final String segmentName = segmentToMove.getIdentifier();
LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
String fromServer = segment.getFromServer().getName(); if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
DataSegment segmentToMove = segment.getSegment(); (toServer.getSegment(segmentName) == null) &&
final String segmentName = segmentToMove.getIdentifier(); new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
log.info(
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && "Moving [%s] from [%s] to [%s]",
(toServer.getSegment(segmentName) == null) && segmentName,
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) { fromServer,
log.info( toServerName
"Moving [%s] from [%s] to [%s]", );
segmentName, try {
master.moveSegment(
fromServer, fromServer,
toServerName toServerName,
); segmentToMove.getIdentifier(),
try { new LoadPeonCallback()
master.moveSegment( {
fromServer, @Override
toServerName, protected void execute()
segmentToMove.getIdentifier(),
new LoadPeonCallback()
{ {
@Override Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
protected void execute() if (movingSegments != null) {
{ movingSegments.remove(segmentName);
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
if (movingSegments != null) {
movingSegments.remove(segmentName);
}
} }
} }
); }
currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment); );
} currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment);
catch (Exception e) {
log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit();
}
} else {
currentlyMovingSegments.get(toServer.getTier()).remove(segment);
} }
catch (Exception e) {
log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit();
}
} else {
currentlyMovingSegments.get(toServer.getTier()).remove(segment);
} }
} }

View File

@ -87,14 +87,11 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList(); List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer();
BalancerCostAnalyzer.BalancerCostComputer helper = analyzer.new BalancerCostComputer(serverHolderList, segment); ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
Pair<Double, ServerHolder> minPair = helper.getMinPair();
ServerHolder holder = minPair.rhs;
if (holder == null) { if (holder == null) {
log.warn( log.warn(
"Not enough %s servers[%d] to assign segment[%s]! Expected Replicants[%d]", "Not enough %s servers[%d] or node capacity to assign segment[%s]! Expected Replicants[%d]",
getTier(), getTier(),
assignedServers.size() + serverQueue.size() + 1, assignedServers.size() + serverQueue.size() + 1,
segment.getIdentifier(), segment.getIdentifier(),
@ -107,27 +104,6 @@ public abstract class LoadRule implements Rule
continue; continue;
} }
if (holder.getAvailableSize() < segment.getSize()) {
log.warn(
"Not enough node capacity, closest is [%s] with %,d available, skipping segment[%s].",
holder.getServer(),
holder.getAvailableSize(),
segment
);
log.makeAlert(
"Not enough node capacity",
ImmutableMap.<String, Object>builder()
.put("segmentSkipped", segment.toString())
.put("closestNode", holder.getServer().toString())
.put("availableSize", holder.getAvailableSize())
.build()
).emit();
serverQueue.add(holder);
stats.addToTieredStat("unassignedCount", getTier(), 1);
stats.addToTieredStat("unassignedSize", getTier(), segment.getSize());
break;
}
holder.getPeon().loadSegment( holder.getPeon().loadSegment(
segment, segment,
new LoadPeonCallback() new LoadPeonCallback()

View File

@ -229,7 +229,6 @@ public class DruidMasterBalancerTest
params = new DruidMasterBalancer(master).run(params); params = new DruidMasterBalancer(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
} }
@Test @Test
@ -379,6 +378,5 @@ public class DruidMasterBalancerTest
params = new DruidMasterBalancer(master).run(params); params = new DruidMasterBalancer(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
} }
} }