revert change to findNewSegmentHome: now return single ServerHolder

This commit is contained in:
Nelson Ray 2013-02-12 10:47:30 -08:00
parent 695b78b19c
commit fcbac96193
4 changed files with 39 additions and 42 deletions

View File

@ -20,14 +20,11 @@
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
@ -61,7 +58,7 @@ public class BalancerCostAnalyzer
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/
public double calculateNormalization(List<ServerHolder> serverHolders)
public double calculateNormalization(final List<ServerHolder> serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
@ -78,7 +75,7 @@ public class BalancerCostAnalyzer
* A list of ServerHolders for a particular tier.
* @return The initial cost of the Druid tier.
*/
public double calculateInitialTotalCost(List<ServerHolder> serverHolders)
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
@ -105,7 +102,7 @@ public class BalancerCostAnalyzer
* The second DataSegment.
* @return The joint cost of placing the two DataSegments together on one node.
*/
public double computeJointSegmentCosts(DataSegment segment1, DataSegment segment2)
public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
{
final Interval gap = segment1.getInterval().gap(segment2.getInterval());
@ -150,7 +147,7 @@ public class BalancerCostAnalyzer
* @return A ServerHolder sampled with probability proportional to the
* number of segments on that server
*/
private ServerHolder sampleServer(List<ServerHolder> serverHolders, int numSegments)
private ServerHolder sampleServer(final List<ServerHolder> serverHolders, final int numSegments)
{
final int num = rand.nextInt(numSegments);
int cumulativeSegments = 0;
@ -172,7 +169,7 @@ public class BalancerCostAnalyzer
* The total number of segments on a particular tier.
* @return A BalancerSegmentHolder sampled uniformly at random.
*/
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, int numSegments)
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders, final int numSegments)
{
/** We want to sample from each server w.p. numSegmentsOnServer / totalSegments */
ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments);
@ -191,30 +188,19 @@ public class BalancerCostAnalyzer
* A DataSegment that we are proposing to move.
* @param serverHolders
* An iterable of ServerHolders for a particular tier.
* @return A MinMaxPriorityQueue of costs of putting the proposalSegment on the server and ServerHolders.
* @return A ServerHolder with the new home for a segment.
*/
public MinMaxPriorityQueue<Pair<Double, ServerHolder>> findNewSegmentHome(DataSegment proposalSegment, Iterable<ServerHolder> serverHolders)
public ServerHolder findNewSegmentHome(final DataSegment proposalSegment, final Iterable<ServerHolder> serverHolders)
{
// Just need a regular priority queue for the min. element.
final MinMaxPriorityQueue<Pair<Double, ServerHolder>> costServerPairs = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
double minCost = Double.MAX_VALUE;
ServerHolder toServer = null;
for (ServerHolder server : serverHolders) {
/** Only calculate costs if the server has enough space. */
if (proposalSegmentSize > server.getAvailableSize()) {
/** Don't calculate cost if the server doesn't have enough space or is serving/loading the segment. */
if (proposalSegmentSize > server.getAvailableSize()
|| server.isServingSegment(proposalSegment)
|| server.isLoadingSegment(proposalSegment)) {
break;
}
@ -231,10 +217,13 @@ public class BalancerCostAnalyzer
cost += computeJointSegmentCosts(proposalSegment, segment);
}
costServerPairs.add(Pair.of(cost, server));
if (cost < minCost) {
minCost = cost;
toServer = server;
}
}
return costServerPairs;
return toServer;
}
}

View File

@ -109,12 +109,17 @@ public class DruidMasterBalancer implements DruidMasterHelper
int iter = 0;
while (iter < maxSegmentsToMove) {
BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments);
DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).pollFirst().rhs.getServer();
iter++;
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments);
final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList);
if (holder == null) {
continue;
}
final DruidServer toServer = holder.getServer();
if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) {
moveSegment(segmentToMove, toServer, params);
}
iter++;
}
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);

View File

@ -62,14 +62,14 @@ public abstract class LoadRule implements Rule
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
MinMaxPriorityQueue<Pair<Double, ServerHolder>> serverCostQueue = analyzer.findNewSegmentHome(segment, serverHolderList);
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
serverCostQueue,
analyzer,
serverHolderList,
segment
)
);
@ -81,16 +81,17 @@ public abstract class LoadRule implements Rule
private MasterStats assign(
final ReplicationThrottler replicationManager,
int expectedReplicants,
final int expectedReplicants,
int totalReplicants,
MinMaxPriorityQueue<Pair<Double, ServerHolder>> serverQueue,
final BalancerCostAnalyzer analyzer,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
{
MasterStats stats = new MasterStats();
final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) {
ServerHolder holder = serverQueue.pollFirst().rhs;
ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
if (holder == null) {
log.warn(
@ -102,10 +103,6 @@ public abstract class LoadRule implements Rule
break;
}
if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) {
continue;
}
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {

View File

@ -143,6 +143,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
@ -150,6 +151,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.replay(druidServer3);
@ -202,6 +204,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
@ -209,6 +212,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
@ -216,6 +220,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
@ -223,6 +228,7 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
// Mock stuff that the master needs