fixed problem of balancer ignoring home server

This commit is contained in:
Dhruv Parthasarathy 2013-08-01 17:28:32 -07:00
parent cd3b62e123
commit 6f28eac995
5 changed files with 61 additions and 25 deletions

View File

@ -25,7 +25,9 @@ import java.util.List;
public interface BalancerStrategy public interface BalancerStrategy
{ {
public ServerHolder findNewSegmentHome(final DataSegment proposalSegment, final List<ServerHolder> serverHolders); public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders); public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);

View File

@ -43,14 +43,24 @@ public class CostBalancerStrategy implements BalancerStrategy
} }
@Override @Override
public ServerHolder findNewSegmentHome( public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders DataSegment proposalSegment, List<ServerHolder> serverHolders
) )
{ {
return computeCosts(proposalSegment, serverHolders).rhs; return chooseBestServer(proposalSegment, serverHolders, false).rhs;
} }
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}
/** /**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment. * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
* *
@ -60,9 +70,10 @@ public class CostBalancerStrategy implements BalancerStrategy
* @return A ServerHolder with the new home for a segment. * @return A ServerHolder with the new home for a segment.
*/ */
private Pair<Double, ServerHolder> computeCosts( private Pair<Double, ServerHolder> chooseBestServer(
final DataSegment proposalSegment, final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders final Iterable<ServerHolder> serverHolders,
boolean includeCurrentServer
) )
{ {
@ -84,26 +95,29 @@ public class CostBalancerStrategy implements BalancerStrategy
final long proposalSegmentSize = proposalSegment.getSize(); final long proposalSegmentSize = proposalSegment.getSize();
for (ServerHolder server : serverHolders) { for (ServerHolder server : serverHolders) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */ if (includeCurrentServer || !server.isServingSegment(proposalSegment))
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { {
continue; /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
} if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f; double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) { for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) { if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment); cost += computeJointSegmentCosts(proposalSegment, segment);
} }
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
if (cost < bestServer.lhs && !server.isServingSegment(proposalSegment)) { if (cost < bestServer.lhs) {
bestServer = Pair.of(cost, server); bestServer = Pair.of(cost, server);
}
} }
} }

View File

@ -116,7 +116,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) { if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params); moveSegment(segmentToMove, holder.getServer(), params);

View File

@ -29,11 +29,31 @@ public class RandomBalancerStrategy implements BalancerStrategy
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
@Override @Override
public ServerHolder findNewSegmentHome( public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders DataSegment proposalSegment, List<ServerHolder> serverHolders
) )
{ {
return serverHolders.get(new Random().nextInt(serverHolders.size())); if (serverHolders.size()==1)
{
return null;
}
else
{
ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
while (holder.isServingSegment(proposalSegment))
{
holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
}
return holder;
}
}
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return null; //To change body of implemented methods use File | Settings | File Templates.
} }
@Override @Override

View File

@ -97,7 +97,7 @@ public abstract class LoadRule implements Rule
break; break;
} }
final ServerHolder holder = strategy.findNewSegmentHome(segment, serverHolderList); final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
if (holder == null) { if (holder == null) {
log.warn( log.warn(