lots of changes

This commit is contained in:
Nelson Ray 2013-01-07 17:28:53 -08:00
parent 6fda5330fd
commit 2b4dbdde03
2 changed files with 38 additions and 56 deletions

View File

@ -32,6 +32,7 @@ import org.joda.time.Interval;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -103,9 +104,8 @@ public class BalancerCostAnalyzer
{
double cost = 0;
for (ServerHolder server : serverHolderList) {
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
cost += computeJointSegmentCosts(segments[i], segments[i]);
for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(segment, segment);
}
}
return cost;
@ -176,14 +176,14 @@ public class BalancerCostAnalyzer
* Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned
* does not have a source server.
*/
public class NullServerHolder extends ServerHolder
public static class NullServerHolder extends ServerHolder
{
public NullServerHolder()
{
super(null, null);
}
public class NullDruidServer extends DruidServer
public static class NullDruidServer extends DruidServer
{
public NullDruidServer()
{
@ -206,36 +206,17 @@ public class BalancerCostAnalyzer
public class BalancerCostAnalyzerHelper
{
private MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsServerHolderPairs;
private List<ServerHolder> serverHolderList;
private DataSegment proposalSegment;
private ServerHolder fromServerHolder;
private Set<BalancerSegmentHolder> segmentHoldersToMove;
private final List<ServerHolder> serverHolderList;
private final DataSegment proposalSegment;
private final ServerHolder fromServerHolder;
private final Set<BalancerSegmentHolder> segmentHoldersToMove;
private Pair<Double, ServerHolder> minPair;
private double currCost;
public MinMaxPriorityQueue<Pair<Double, ServerHolder>> getCostsServerHolderPairs()
public Pair<Double, ServerHolder> getMinPair()
{
return costsServerHolderPairs;
}
public List<ServerHolder> getServerHolderList()
{
return serverHolderList;
}
public DataSegment getProposalSegment()
{
return proposalSegment;
}
public ServerHolder getFromServerHolder()
{
return fromServerHolder;
}
public Set<BalancerSegmentHolder> getSegmentHoldersToMove()
{
return segmentHoldersToMove;
return minPair;
}
public double getCurrCost()
@ -257,9 +238,20 @@ public class BalancerCostAnalyzer
ServerHolder fromServerHolder,
Set<BalancerSegmentHolder> segmentHoldersToMove
)
{
this.serverHolderList = serverHolderList;
this.proposalSegment = proposalSegment;
this.fromServerHolder = fromServerHolder;
this.segmentHoldersToMove = segmentHoldersToMove;
this.currCost = 0;
computeAllCosts();
}
public void computeAllCosts()
{
// Just need a regular priority queue for the min. element.
this.costsServerHolderPairs = MinMaxPriorityQueue.orderedBy(
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsServerHolderPairs = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
@ -272,17 +264,12 @@ public class BalancerCostAnalyzer
}
}
).create();
this.serverHolderList = serverHolderList;
this.proposalSegment = proposalSegment;
this.fromServerHolder = fromServerHolder;
this.segmentHoldersToMove = segmentHoldersToMove;
this.currCost = 0;
}
public void computeAllCosts()
{
// The contribution to the total cost of a given server by proposing to move the segment to that server is...
for (ServerHolder server : serverHolderList) {
// Only calculate costs if the server has enough space.
if (proposalSegment.getSize() > server.getAvailableSize()) break;
// The contribution to the total cost of a given server by proposing to move the segment to that server is...
double cost = 0f;
// the sum of the costs of other (inclusive) segments on the server
for (DataSegment segment : server.getServer().getSegments().values()) {
@ -295,9 +282,7 @@ public class BalancerCostAnalyzer
}
// plus the costs of segments that will be moved.
Iterator it = segmentHoldersToMove.iterator();
while (it.hasNext()) {
BalancerSegmentHolder segmentToMove = (BalancerSegmentHolder) it.next();
for (BalancerSegmentHolder segmentToMove : segmentHoldersToMove) {
if (server.getServer().equals(segmentToMove.getToServer())) {
cost += computeJointSegmentCosts(proposalSegment, segmentToMove.getSegment());
}
@ -311,12 +296,10 @@ public class BalancerCostAnalyzer
currCost = cost;
}
// Only enter the queue if the server has enough size.
if (proposalSegment.getSize() < server.getAvailableSize()) {
costsServerHolderPairs.add(Pair.of(cost, server));
}
costsServerHolderPairs.add(Pair.of(cost, server));
}
minPair = costsServerHolderPairs.pollFirst();
}
}
@ -342,13 +325,13 @@ public class BalancerCostAnalyzer
{
Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet();
Set<DataSegment> movingSegments = Sets.newHashSet();
int numServers = serverHolderList.size();
int counter = 0;
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
counter++;
int numServers = serverHolderList.size();
if (numServers == 0) break;
// We want to sample from each server w.p. numSegmentsOnServer / totalSegments
@ -358,7 +341,7 @@ public class BalancerCostAnalyzer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
if (segments.size() == 0) continue;
if (segments.isEmpty()) continue;
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
if (movingSegments.contains(proposalSegment)) {
@ -371,9 +354,8 @@ public class BalancerCostAnalyzer
fromServerHolder,
segmentHoldersToMove
);
helper.computeAllCosts();
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();
Pair<Double, ServerHolder> minPair = helper.getMinPair();
if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) {
movingSegments.add(proposalSegment);

View File

@ -89,8 +89,8 @@ public abstract class LoadRule implements Rule
while (totalReplicants < expectedReplicants) {
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now());
BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment);
helper.computeAllCosts();
Pair<Double, ServerHolder> minPair = helper.getCostsServerHolderPairs().pollFirst();
Pair<Double, ServerHolder> minPair = helper.getMinPair();
ServerHolder holder = minPair.rhs;
if (holder == null) {