fix bugs in balancing logic and removing auto-scaled node

This commit is contained in:
Fangjin Yang 2013-03-04 13:03:15 -08:00 committed by Eric Tschetter
parent a9f7094ef3
commit 848d49ca87
5 changed files with 54 additions and 64 deletions

View File

@ -457,8 +457,6 @@ public class RemoteTaskRunner implements TaskRunner
if (runningTasks.containsKey(taskId)) { if (runningTasks.containsKey(taskId)) {
log.info("Task %s just disappeared!", taskId); log.info("Task %s just disappeared!", taskId);
retryTask(runningTasks.get(taskId), worker.getHost()); retryTask(runningTasks.get(taskId), worker.getHost());
} else {
log.info("A task disappeared I didn't know about: %s", taskId);
} }
} }
} }

View File

@ -167,7 +167,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
) )
); );
if (thoseLazyWorkers.isEmpty()) { if (thoseLazyWorkers.size() <= minNumWorkers) {
return false; return false;
} }

View File

@ -52,11 +52,12 @@ public class BalancerCostAnalyzer
/** /**
* Calculates the cost normalization. This is such that the normalized cost is lower bounded * Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node). * by 1 (e.g. when each segment gets its own compute node).
* @param serverHolders *
* A list of ServerHolders for a particular tier. * @param serverHolders A list of ServerHolders for a particular tier.
* @return The normalization value (the sum of the diagonal entries in the *
* pairwise cost matrix). This is the cost of a cluster if each * @return The normalization value (the sum of the diagonal entries in the
* segment were to get its own compute node. * pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/ */
public double calculateNormalization(final List<ServerHolder> serverHolders) public double calculateNormalization(final List<ServerHolder> serverHolders)
{ {
@ -71,9 +72,10 @@ public class BalancerCostAnalyzer
/** /**
* Calculates the initial cost of the Druid segment configuration. * Calculates the initial cost of the Druid segment configuration.
* @param serverHolders *
* A list of ServerHolders for a particular tier. * @param serverHolders A list of ServerHolders for a particular tier.
* @return The initial cost of the Druid tier. *
* @return The initial cost of the Druid tier.
*/ */
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders) public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
{ {
@ -96,11 +98,11 @@ public class BalancerCostAnalyzer
* dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved * dataSourcePenalty: if two segments belong to the same data source, they are more likely to be involved
* in the same queries * in the same queries
* gapPenalty: it is more likely that segments close together in time will be queried together * gapPenalty: it is more likely that segments close together in time will be queried together
* @param segment1 *
* The first DataSegment. * @param segment1 The first DataSegment.
* @param segment2 * @param segment2 The second DataSegment.
* The second DataSegment. *
* @return The joint cost of placing the two DataSegments together on one node. * @return The joint cost of placing the two DataSegments together on one node.
*/ */
public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2)
{ {
@ -140,12 +142,12 @@ public class BalancerCostAnalyzer
/** /**
* Sample from each server with probability proportional to the number of segments on that server. * Sample from each server with probability proportional to the number of segments on that server.
* @param serverHolders *
* A list of ServerHolders for a particular tier. * @param serverHolders A list of ServerHolders for a particular tier.
* @param numSegments * @param numSegments
*
* @return A ServerHolder sampled with probability proportional to the * @return A ServerHolder sampled with probability proportional to the
* number of segments on that server * number of segments on that server
*/ */
private ServerHolder sampleServer(final List<ServerHolder> serverHolders, final int numSegments) private ServerHolder sampleServer(final List<ServerHolder> serverHolders, final int numSegments)
{ {
@ -163,11 +165,11 @@ public class BalancerCostAnalyzer
/** /**
* The balancing application requires us to pick a proposal segment. * The balancing application requires us to pick a proposal segment.
* @param serverHolders *
* A list of ServerHolders for a particular tier. * @param serverHolders A list of ServerHolders for a particular tier.
* @param numSegments * @param numSegments The total number of segments on a particular tier.
* The total number of segments on a particular tier. *
* @return A BalancerSegmentHolder sampled uniformly at random. * @return A BalancerSegmentHolder sampled uniformly at random.
*/ */
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders, final int numSegments) public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders, final int numSegments)
{ {
@ -175,7 +177,7 @@ public class BalancerCostAnalyzer
ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments); ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments);
/** and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer /** and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
so that the probability of picking a segment is 1 / totalSegments. */ so that the probability of picking a segment is 1 / totalSegments. */
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
@ -184,18 +186,15 @@ public class BalancerCostAnalyzer
/** /**
* The assignment application requires us to supply a proposal segment. * The assignment application requires us to supply a proposal segment.
* @param proposalSegment *
* A DataSegment that we are proposing to move. * @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders * @param serverHolders An iterable of ServerHolders for a particular tier.
* An iterable of ServerHolders for a particular tier. *
* @param assign * @return A ServerHolder with the new home for a segment.
* A boolean that is true if used in assignment else false in balancing.
* @return A ServerHolder with the new home for a segment.
*/ */
public ServerHolder findNewSegmentHome( public ServerHolder findNewSegmentHome(
final DataSegment proposalSegment, final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders, final Iterable<ServerHolder> serverHolders
final boolean assign
) )
{ {
final long proposalSegmentSize = proposalSegment.getSize(); final long proposalSegmentSize = proposalSegment.getSize();
@ -204,10 +203,9 @@ public class BalancerCostAnalyzer
for (ServerHolder server : serverHolders) { for (ServerHolder server : serverHolders) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */ /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() if (proposalSegmentSize > server.getAvailableSize() ||
|| server.isLoadingSegment(proposalSegment) server.isLoadingSegment(proposalSegment) ||
/** or if the ask is assignment and the server is serving the segment. */ server.isServingSegment(proposalSegment)) {
|| (assign && server.isServingSegment(proposalSegment)) ) {
continue; continue;
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.master; package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
@ -66,9 +67,9 @@ public class DruidMasterBalancer implements DruidMasterHelper
holder.reduceLifetime(); holder.reduceLifetime();
if (holder.getLifetime() <= 0) { if (holder.getLifetime() <= 0) {
log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier) log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier)
.addData("segment", holder.getSegment().getIdentifier()) .addData("segment", holder.getSegment().getIdentifier())
.addData("server", holder.getFromServer().getStringProps()) .addData("server", holder.getFromServer().getStringProps())
.emit(); .emit();
} }
} }
} }
@ -95,7 +96,13 @@ public class DruidMasterBalancer implements DruidMasterHelper
continue; continue;
} }
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(entry.getValue()); final List<ServerHolder> serverHolderList = Lists.newArrayList(entry.getValue());
if (serverHolderList.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
continue;
}
int numSegments = 0; int numSegments = 0;
for (ServerHolder server : serverHolderList) { for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size(); numSegments += server.getServer().getSegments().size();
@ -107,19 +114,14 @@ public class DruidMasterBalancer implements DruidMasterHelper
} }
int iter = 0; int iter = 0;
while (iter < maxSegmentsToMove) { while (iter < maxSegmentsToMove) {
iter++; iter++;
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments);
final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList, false); final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList);
if (holder == null) { if (holder == null) {
continue; continue;
} }
final DruidServer toServer = holder.getServer(); moveSegment(segmentToMove, holder.getServer(), params);
if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) {
moveSegment(segmentToMove, toServer, params);
}
} }
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
@ -132,21 +134,13 @@ public class DruidMasterBalancer implements DruidMasterHelper
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
log.info( log.info(
"Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]", "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
tier,
initialTotalCost, initialTotalCost,
normalization, normalization,
normalizedInitialCost, normalizedInitialCost,
currentlyMovingSegments.get(tier).size() currentlyMovingSegments.get(tier).size()
); );
if (serverHolderList.size() <= 1) {
log.info(
"[%s]: One or fewer servers found. Cannot balance.",
tier
);
continue;
}
} }
return params.buildFromExisting() return params.buildFromExisting()
@ -194,7 +188,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit(); log.makeAlert(e, String.format("[%s] : Moving exception", segmentName)).emit();
} }
} else { } else {
currentlyMovingSegments.get(toServer.getTier()).remove(segment); currentlyMovingSegments.get(toServer.getTier()).remove(segmentName);
} }
} }

View File

@ -91,7 +91,7 @@ public abstract class LoadRule implements Rule
final MasterStats stats = new MasterStats(); final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList, true); final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
if (holder == null) { if (holder == null) {
log.warn( log.warn(