mirror of https://github.com/apache/druid.git
working balancercostanalyzer
This commit is contained in:
parent
366216f9f3
commit
3eda9dc5f2
|
@ -52,7 +52,7 @@ public class BalancerCostAnalyzer
|
||||||
private Random rand;
|
private Random rand;
|
||||||
|
|
||||||
public BalancerCostAnalyzer(){
|
public BalancerCostAnalyzer(){
|
||||||
rand = new Random();
|
rand = new Random(0);
|
||||||
totalCostChange = 0f;
|
totalCostChange = 0f;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,33 +69,8 @@ public class BalancerCostAnalyzer
|
||||||
return totalCostChange;
|
return totalCostChange;
|
||||||
}
|
}
|
||||||
|
|
||||||
public float calculateInitialTotalCost(List<ServerHolder> serverHolderList){
|
private float calculateInitialTotalCost(List<ServerHolder> serverHolderList){
|
||||||
int cost = 0;
|
float cost = 0;
|
||||||
for (ServerHolder server : serverHolderList) {
|
|
||||||
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
|
||||||
for (int i = 0; i < segments.length; ++i) {
|
|
||||||
for (int j = i; j < segments.length; ++j) {
|
|
||||||
cost += computeJointSegmentCosts(segments[i], segments[j]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return cost;
|
|
||||||
}
|
|
||||||
|
|
||||||
public float calculateTotalCostChange(List<ServerHolder> serverHolderList, Set<BalancerSegmentHolder2> segmentHoldersToMove){
|
|
||||||
int cost = 0;
|
|
||||||
Iterator it = segmentHoldersToMove.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
BalancerSegmentHolder2 segmentHolder = (BalancerSegmentHolder2) it.next();
|
|
||||||
for (DataSegment fromSegment : segmentHolder.getFromServer().getSegments().values()) {
|
|
||||||
cost -= computeJointSegmentCosts(segmentHolder.getSegment(), fromSegment);
|
|
||||||
}
|
|
||||||
for (DataSegment toSegment : segmentHolder.getToServer().getSegments().values()) {
|
|
||||||
cost += computeJointSegmentCosts(segmentHolder.getSegment(), toSegment);
|
|
||||||
}
|
|
||||||
return cost;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ServerHolder server : serverHolderList) {
|
for (ServerHolder server : serverHolderList) {
|
||||||
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
|
||||||
for (int i = 0; i < segments.length; ++i) {
|
for (int i = 0; i < segments.length; ++i) {
|
||||||
|
@ -130,8 +105,9 @@ public class BalancerCostAnalyzer
|
||||||
Set<DataSegment> movingSegments = Sets.newHashSet();
|
Set<DataSegment> movingSegments = Sets.newHashSet();
|
||||||
|
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
|
float currCost = 0f;
|
||||||
|
|
||||||
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 2 * MAX_SEGMENTS_TO_MOVE) {
|
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
|
||||||
counter++;
|
counter++;
|
||||||
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
|
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
|
||||||
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
|
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
|
||||||
|
@ -139,7 +115,7 @@ public class BalancerCostAnalyzer
|
||||||
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
|
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
|
||||||
if (movingSegments.contains(proposalSegment)) continue;
|
if (movingSegments.contains(proposalSegment)) continue;
|
||||||
|
|
||||||
//Just need a regular priority queue for the min. element.
|
// Just need a regular priority queue for the min. element.
|
||||||
MinMaxPriorityQueue<Pair<Float, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy(
|
MinMaxPriorityQueue<Pair<Float, ServerHolder>> pQueue = MinMaxPriorityQueue.orderedBy(
|
||||||
new Comparator<Pair<Float, ServerHolder>>()
|
new Comparator<Pair<Float, ServerHolder>>()
|
||||||
{
|
{
|
||||||
|
@ -160,7 +136,10 @@ public class BalancerCostAnalyzer
|
||||||
cost += computeJointSegmentCosts(proposalSegment, segment);
|
cost += computeJointSegmentCosts(proposalSegment, segment);
|
||||||
}
|
}
|
||||||
|
|
||||||
//Take into account costs of segments that will be moved.
|
// self cost
|
||||||
|
if (!server.getServer().equals(fromServerHolder.getServer())) cost += computeJointSegmentCosts(proposalSegment, proposalSegment);
|
||||||
|
|
||||||
|
// Take into account costs of segments that will be moved.
|
||||||
Iterator it = segmentHoldersToMove.iterator();
|
Iterator it = segmentHoldersToMove.iterator();
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
BalancerSegmentHolder2 segmentToMove = (BalancerSegmentHolder2) it.next();
|
BalancerSegmentHolder2 segmentToMove = (BalancerSegmentHolder2) it.next();
|
||||||
|
@ -172,6 +151,10 @@ public class BalancerCostAnalyzer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (server.getServer().equals(fromServerHolder.getServer())){
|
||||||
|
currCost = cost;
|
||||||
|
}
|
||||||
|
|
||||||
pQueue.add(Pair.of(cost, server));
|
pQueue.add(Pair.of(cost, server));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,44 +168,12 @@ public class BalancerCostAnalyzer
|
||||||
proposalSegment
|
proposalSegment
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
totalCostChange += minPair.lhs;
|
totalCostChange += currCost - minPair.lhs;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
totalCostChange = calculateTotalCostChange(serverHolderList, segmentHoldersToMove);
|
|
||||||
return segmentHoldersToMove;
|
return segmentHoldersToMove;
|
||||||
|
|
||||||
/*
|
|
||||||
double currPercentDiff = getPercentDiff();
|
|
||||||
|
|
||||||
if (currPercentDiff < PERCENT_THRESHOLD) {
|
|
||||||
log.info("Cluster usage is balanced.");
|
|
||||||
return segmentsToMove;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<DruidDataSource> dataSources = Lists.newArrayList(server.getDataSources());
|
|
||||||
Collections.shuffle(dataSources);
|
|
||||||
|
|
||||||
for (DruidDataSource dataSource : dataSources) {
|
|
||||||
List<DataSegment> segments = Lists.newArrayList(dataSource.getSegments());
|
|
||||||
Collections.shuffle(segments);
|
|
||||||
|
|
||||||
for (DataSegment segment : segments) {
|
|
||||||
if (segmentsToMove.size() >= MAX_SEGMENTS_TO_MOVE) {
|
|
||||||
return segmentsToMove;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (getLookaheadPercentDiff(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize())
|
|
||||||
< currPercentDiff) {
|
|
||||||
segmentsToMove.add(new BalancerSegmentHolder(server, segment));
|
|
||||||
update(highestSizeUsed - segment.getSize(), lowestSizeUsed + segment.getSize());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return segmentsToMove;
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,13 @@ public class DruidMasterBalancerTest
|
||||||
).anyTimes();
|
).anyTimes();
|
||||||
EasyMock.replay(dataSource);
|
EasyMock.replay(dataSource);
|
||||||
|
|
||||||
|
/*
|
||||||
|
(2, 1, 2, 1
|
||||||
|
-, 2, 1, 2
|
||||||
|
-, -, 2, 1
|
||||||
|
-, -, -, 2)
|
||||||
|
*/
|
||||||
|
|
||||||
// Mock some segments of different sizes
|
// Mock some segments of different sizes
|
||||||
EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes();
|
EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes();
|
||||||
EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes();
|
EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes();
|
||||||
|
|
Loading…
Reference in New Issue