add some finals, reorganize

This commit is contained in:
Nelson Ray 2013-01-08 09:07:43 -08:00
parent 2b4dbdde03
commit 4549c3a8c3
2 changed files with 104 additions and 103 deletions

View File

@ -30,9 +30,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -45,15 +43,13 @@ import java.util.Set;
public class BalancerCostAnalyzer public class BalancerCostAnalyzer
{ {
private static final Logger log = new Logger(BalancerCostAnalyzer.class); private static final Logger log = new Logger(BalancerCostAnalyzer.class);
private int MAX_SEGMENTS_TO_MOVE;
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private final Random rand;
private final DateTime referenceTimestamp;
private int MAX_SEGMENTS_TO_MOVE;
private List<ServerHolder> serverHolderList; private List<ServerHolder> serverHolderList;
private Random rand;
private DateTime referenceTimestamp;
private double initialTotalCost; private double initialTotalCost;
private double normalization; private double normalization;
private double totalCostChange; private double totalCostChange;
@ -67,6 +63,7 @@ public class BalancerCostAnalyzer
totalCostChange = 0; totalCostChange = 0;
} }
// The assignment usage doesn't require initialization.
public void init(List<ServerHolder> serverHolderList, DruidMasterRuntimeParams params) public void init(List<ServerHolder> serverHolderList, DruidMasterRuntimeParams params)
{ {
this.serverHolderList = serverHolderList; this.serverHolderList = serverHolderList;
@ -136,10 +133,9 @@ public class BalancerCostAnalyzer
*/ */
public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2) public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2)
{ {
double cost = 0; final Interval gap = segment1.getInterval().gap(segment2.getInterval());
Interval gap = segment1.getInterval().gap(segment2.getInterval());
double baseCost = Math.min(segment1.getSize(), segment2.getSize()); final double baseCost = Math.min(segment1.getSize(), segment2.getSize());
double recencyPenalty = 1; double recencyPenalty = 1;
double dataSourcePenalty = 1; double dataSourcePenalty = 1;
double gapPenalty = 1; double gapPenalty = 1;
@ -166,13 +162,86 @@ public class BalancerCostAnalyzer
} }
} }
cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty; final double cost = baseCost * recencyPenalty * dataSourcePenalty * gapPenalty;
return cost; return cost;
} }
/* /*
* These could be anonymous in BalancerCostAnalyzerHelper * Sample from each server with probability proportional to the number of segments on that server.
*/
private ServerHolder sampleServer()
{
final int num = rand.nextInt(totalSegments);
int cumulativeSegments = 0;
int numToStopAt = 0;
while (cumulativeSegments <= num) {
cumulativeSegments += serverHolderList.get(numToStopAt).getServer().getSegments().size();
numToStopAt++;
}
return serverHolderList.get(numToStopAt - 1);
}
public Set<BalancerSegmentHolder> findSegmentsToMove()
{
final Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet();
final Set<DataSegment> movingSegments = Sets.newHashSet();
int numServers = serverHolderList.size();
int counter = 0;
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
counter++;
if (numServers == 0) {
break;
}
// We want to sample from each server w.p. numSegmentsOnServer / totalSegments
ServerHolder fromServerHolder = sampleServer();
// and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
if (segments.isEmpty()) {
continue;
}
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
if (movingSegments.contains(proposalSegment)) {
continue;
}
BalancerCostComputer helper = new BalancerCostComputer(
serverHolderList,
proposalSegment,
fromServerHolder,
segmentHoldersToMove
);
Pair<Double, ServerHolder> minPair = helper.getMinPair();
if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) {
movingSegments.add(proposalSegment);
segmentHoldersToMove.add(
new BalancerSegmentHolder(
fromServerHolder.getServer(),
minPair.rhs.getServer(),
proposalSegment
)
);
totalCostChange += helper.getCurrCost() - minPair.lhs;
}
}
return segmentHoldersToMove;
}
/*
* These could be anonymous in BalancerCostComputer
* Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned * Their purpose is to unify the balance/assignment code since a segment that has not yet been assigned
* does not have a source server. * does not have a source server.
*/ */
@ -183,6 +252,12 @@ public class BalancerCostAnalyzer
super(null, null); super(null, null);
} }
@Override
public DruidServer getServer()
{
return new NullDruidServer();
}
public static class NullDruidServer extends DruidServer public static class NullDruidServer extends DruidServer
{ {
public NullDruidServer() public NullDruidServer()
@ -196,35 +271,18 @@ public class BalancerCostAnalyzer
return false; return false;
} }
} }
@Override
public DruidServer getServer()
{
return new NullDruidServer();
}
} }
public class BalancerCostAnalyzerHelper public class BalancerCostComputer
{ {
private final List<ServerHolder> serverHolderList; private final List<ServerHolder> serverHolderList;
private final DataSegment proposalSegment; private final DataSegment proposalSegment;
private final ServerHolder fromServerHolder; private final ServerHolder fromServerHolder;
private final Set<BalancerSegmentHolder> segmentHoldersToMove; private final Set<BalancerSegmentHolder> segmentHoldersToMove;
private Pair<Double, ServerHolder> minPair; private Pair<Double, ServerHolder> minPair;
private double currCost; private double currCost;
public Pair<Double, ServerHolder> getMinPair() public BalancerCostComputer(
{
return minPair;
}
public double getCurrCost()
{
return currCost;
}
public BalancerCostAnalyzerHelper(
List<ServerHolder> serverHolderList, List<ServerHolder> serverHolderList,
DataSegment proposalSegment DataSegment proposalSegment
) )
@ -232,7 +290,7 @@ public class BalancerCostAnalyzer
this(serverHolderList, proposalSegment, new NullServerHolder(), Sets.<BalancerSegmentHolder>newHashSet()); this(serverHolderList, proposalSegment, new NullServerHolder(), Sets.<BalancerSegmentHolder>newHashSet());
} }
public BalancerCostAnalyzerHelper( public BalancerCostComputer(
List<ServerHolder> serverHolderList, List<ServerHolder> serverHolderList,
DataSegment proposalSegment, DataSegment proposalSegment,
ServerHolder fromServerHolder, ServerHolder fromServerHolder,
@ -248,6 +306,16 @@ public class BalancerCostAnalyzer
computeAllCosts(); computeAllCosts();
} }
public Pair<Double, ServerHolder> getMinPair()
{
return minPair;
}
public double getCurrCost()
{
return currCost;
}
public void computeAllCosts() public void computeAllCosts()
{ {
// Just need a regular priority queue for the min. element. // Just need a regular priority queue for the min. element.
@ -267,7 +335,9 @@ public class BalancerCostAnalyzer
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
// Only calculate costs if the server has enough space. // Only calculate costs if the server has enough space.
if (proposalSegment.getSize() > server.getAvailableSize()) break; if (proposalSegment.getSize() > server.getAvailableSize()) {
break;
}
// The contribution to the total cost of a given server by proposing to move the segment to that server is... // The contribution to the total cost of a given server by proposing to move the segment to that server is...
double cost = 0f; double cost = 0f;
@ -303,75 +373,6 @@ public class BalancerCostAnalyzer
} }
} }
/*
* Sample from each server with probability proportional to the number of segments on that server.
*/
private ServerHolder sampleServer()
{
int num = rand.nextInt(totalSegments);
int cumulativeSegments = 0;
int numToStopAt = 0;
while (cumulativeSegments <= num) {
cumulativeSegments += serverHolderList.get(numToStopAt).getServer().getSegments().size();
numToStopAt++;
}
return serverHolderList.get(numToStopAt - 1);
}
public Set<BalancerSegmentHolder> findSegmentsToMove()
{
Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet();
Set<DataSegment> movingSegments = Sets.newHashSet();
int numServers = serverHolderList.size();
int counter = 0;
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
counter++;
if (numServers == 0) break;
// We want to sample from each server w.p. numSegmentsOnServer / totalSegments
ServerHolder fromServerHolder = sampleServer();
// and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
if (segments.isEmpty()) continue;
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
if (movingSegments.contains(proposalSegment)) {
continue;
}
BalancerCostAnalyzerHelper helper = new BalancerCostAnalyzerHelper(
serverHolderList,
proposalSegment,
fromServerHolder,
segmentHoldersToMove
);
Pair<Double, ServerHolder> minPair = helper.getMinPair();
if (minPair.rhs != null && !minPair.rhs.equals(fromServerHolder)) {
movingSegments.add(proposalSegment);
segmentHoldersToMove.add(
new BalancerSegmentHolder(
fromServerHolder.getServer(),
minPair.rhs.getServer(),
proposalSegment
)
);
totalCostChange += helper.getCurrCost() - minPair.lhs;
}
}
return segmentHoldersToMove;
}
} }

View File

@ -88,7 +88,7 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList(); List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now()); BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now());
BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment); BalancerCostAnalyzer.BalancerCostComputer helper = analyzer.new BalancerCostComputer(serverHolderList, segment);
Pair<Double, ServerHolder> minPair = helper.getMinPair(); Pair<Double, ServerHolder> minPair = helper.getMinPair();