This commit is contained in:
Nelson Ray 2013-01-03 10:48:05 -08:00
parent 5441662cb8
commit bb31fa7b68
3 changed files with 26 additions and 22 deletions

View File

@ -49,12 +49,12 @@ public class BalancerCostAnalyzer
private List<ServerHolder> serverHolderList; private List<ServerHolder> serverHolderList;
private Random rand; private Random rand;
private float initialTotalCost; private double initialTotalCost;
private float totalCostChange; private double totalCostChange;
public BalancerCostAnalyzer(){ public BalancerCostAnalyzer(){
rand = new Random(0); rand = new Random(0);
totalCostChange = 0f; totalCostChange = 0;
} }
public void init(List<ServerHolder> serverHolderList){ public void init(List<ServerHolder> serverHolderList){
@ -62,16 +62,16 @@ public class BalancerCostAnalyzer
this.serverHolderList = serverHolderList; this.serverHolderList = serverHolderList;
} }
public float getInitialTotalCost() { public double getInitialTotalCost() {
return initialTotalCost; return initialTotalCost;
} }
public float getTotalCostChange() { public double getTotalCostChange() {
return totalCostChange; return totalCostChange;
} }
private float calculateInitialTotalCost(List<ServerHolder> serverHolderList){ private double calculateInitialTotalCost(List<ServerHolder> serverHolderList){
float cost = 0; double cost = 0;
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) { for (int i = 0; i < segments.length; ++i) {
@ -83,8 +83,8 @@ public class BalancerCostAnalyzer
return cost; return cost;
} }
public float computeJointSegmentCosts(DataSegment segment1, DataSegment segment2){ public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2){
float cost = 0f; double cost = 0;
Interval gap = segment1.getInterval().gap(segment2.getInterval()); Interval gap = segment1.getInterval().gap(segment2.getInterval());
// gap is null if the two segment intervals overlap or if they're adjacent // gap is null if the two segment intervals overlap or if they're adjacent
@ -106,7 +106,7 @@ public class BalancerCostAnalyzer
Set<DataSegment> movingSegments = Sets.newHashSet(); Set<DataSegment> movingSegments = Sets.newHashSet();
int counter = 0; int counter = 0;
float currCost = 0f; double currCost = 0;
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) { while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
counter++; counter++;
@ -117,22 +117,22 @@ public class BalancerCostAnalyzer
if (movingSegments.contains(proposalSegment)) continue; if (movingSegments.contains(proposalSegment)) continue;
// Just need a regular priority queue for the min. element. // Just need a regular priority queue for the min. element.
MinMaxPriorityQueue<Pair<Float, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy( MinMaxPriorityQueue<Pair<Double, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Float, ServerHolder>>() new Comparator<Pair<Double, ServerHolder>>()
{ {
@Override @Override
public int compare( public int compare(
Pair<Float, ServerHolder> o, Pair<Double, ServerHolder> o,
Pair<Float, ServerHolder> o1 Pair<Double, ServerHolder> o1
) )
{ {
return Float.compare(o.lhs, o1.lhs); return Double.compare(o.lhs, o1.lhs);
} }
} }
).create(); ).create();
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
float cost = 0f; double cost = 0f;
for (DataSegment segment : server.getServer().getSegments().values()) { for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(proposalSegment, segment); cost += computeJointSegmentCosts(proposalSegment, segment);
} }
@ -159,7 +159,7 @@ public class BalancerCostAnalyzer
pQueue.add(Pair.of(cost, server)); pQueue.add(Pair.of(cost, server));
} }
Pair<Float, ServerHolder> minPair = pQueue.peekFirst(); Pair<Double, ServerHolder> minPair = pQueue.peekFirst();
if (!minPair.rhs.equals(fromServerHolder)) { if (!minPair.rhs.equals(fromServerHolder)) {
movingSegments.add(proposalSegment); movingSegments.add(proposalSegment);
segmentHoldersToMove.add( segmentHoldersToMove.add(

View File

@ -110,8 +110,8 @@ public class DruidMasterBalancer implements DruidMasterHelper
continue; continue;
} }
TreeSet<ServerHolder> serversByPercentUsed = Sets.newTreeSet(percentUsedComparator); //TreeSet<ServerHolder> serversByPercentUsed = Sets.newTreeSet(percentUsedComparator);
serversByPercentUsed.addAll(entry.getValue()); //serversByPercentUsed.addAll(entry.getValue());
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue()); List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue());
@ -128,18 +128,19 @@ public class DruidMasterBalancer implements DruidMasterHelper
analyzer.getTotalCostChange() analyzer.getTotalCostChange()
); );
if (serversByPercentUsed.size() <= 1) { if (serverHolderList.size() <= 1) {
log.info( log.info(
"[%s]: No unique values found for highest and lowest percent used servers: nothing to balance", "[%s]: One or fewer servers found. Cannot balance.",
tier tier
); );
continue; continue;
} }
/*
ServerHolder highestPercentUsedServer = serversByPercentUsed.first(); ServerHolder highestPercentUsedServer = serversByPercentUsed.first();
ServerHolder lowestPercentUsedServer = serversByPercentUsed.last(); ServerHolder lowestPercentUsedServer = serversByPercentUsed.last();
/*
analyzer.init(highestPercentUsedServer, lowestPercentUsedServer); analyzer.init(highestPercentUsedServer, lowestPercentUsedServer);
log.info( log.info(

View File

@ -30,6 +30,7 @@ import com.metamx.druid.master.MasterStats;
import com.metamx.druid.master.ServerHolder; import com.metamx.druid.master.ServerHolder;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -70,6 +71,8 @@ public abstract class LoadRule implements Rule
{ {
MasterStats stats = new MasterStats(); MasterStats stats = new MasterStats();
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
List<ServerHolder> assignedServers = Lists.newArrayList(); List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst(); ServerHolder holder = serverQueue.pollFirst();