make findNewSegmentHome return a priority queue so assignment can assign replicants to other servers

This commit is contained in:
Nelson Ray 2013-02-11 15:28:17 -08:00
parent 1b9764fffb
commit 695b78b19c
4 changed files with 40 additions and 28 deletions

View File

@ -20,11 +20,14 @@
package com.metamx.druid.master;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
@ -188,13 +191,26 @@ public class BalancerCostAnalyzer
* A DataSegment that we are proposing to move.
* @param serverHolders
* An iterable of ServerHolders for a particular tier.
* @return A ServerHolder with the new home for a segment.
* @return A MinMaxPriorityQueue of costs of putting the proposalSegment on the server and ServerHolders.
*/
public ServerHolder findNewSegmentHome(DataSegment proposalSegment, Iterable<ServerHolder> serverHolders)
public MinMaxPriorityQueue<Pair<Double, ServerHolder>> findNewSegmentHome(DataSegment proposalSegment, Iterable<ServerHolder> serverHolders)
{
// Just need a regular priority queue for the min. element.
final MinMaxPriorityQueue<Pair<Double, ServerHolder>> costServerPairs = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
double minCost = Double.MAX_VALUE;
ServerHolder toServer = null;
for (ServerHolder server : serverHolders) {
/** Only calculate costs if the server has enough space. */
@ -215,13 +231,10 @@ public class BalancerCostAnalyzer
cost += computeJointSegmentCosts(proposalSegment, segment);
}
if (cost < minCost) {
minCost = cost;
toServer = server;
}
costServerPairs.add(Pair.of(cost, server));
}
return toServer;
return costServerPairs;
}
}

View File

@ -110,7 +110,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
while (iter < maxSegmentsToMove) {
BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments);
DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).getServer();
DruidServer toServer = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList).pollFirst().rhs.getServer();
if (!currentlyMovingSegments.get(tier).containsKey(segmentToMove.getSegment())) {
moveSegment(segmentToMove, toServer, params);
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.BalancerCostAnalyzer;
import com.metamx.druid.master.DruidMaster;
@ -58,14 +59,18 @@ public abstract class LoadRule implements Rule
return stats;
}
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
MinMaxPriorityQueue<Pair<Double, ServerHolder>> serverCostQueue = analyzer.findNewSegmentHome(segment, serverHolderList);
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
serverQueue,
segment,
params
serverCostQueue,
segment
)
);
@ -78,40 +83,32 @@ public abstract class LoadRule implements Rule
final ReplicationThrottler replicationManager,
int expectedReplicants,
int totalReplicants,
MinMaxPriorityQueue<ServerHolder> serverQueue,
final DataSegment segment,
final DruidMasterRuntimeParams params
MinMaxPriorityQueue<Pair<Double, ServerHolder>> serverQueue,
final DataSegment segment
)
{
MasterStats stats = new MasterStats();
List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList);
ServerHolder holder = serverQueue.pollFirst().rhs;
if (holder == null) {
log.warn(
"Not enough %s servers[%d] or node capacity to assign segment[%s]! Expected Replicants[%d]",
"Not enough %s servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
getTier(),
assignedServers.size() + serverQueue.size() + 1,
segment.getIdentifier(),
expectedReplicants
);
break;
}
if (holder.isServingSegment(segment) || holder.isLoadingSegment(segment)) {
assignedServers.add(holder);
continue;
}
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
serverQueue.add(holder);
break;
}
}
@ -127,12 +124,10 @@ public abstract class LoadRule implements Rule
}
}
);
assignedServers.add(holder);
stats.addToTieredStat("assignedCount", getTier(), 1);
++totalReplicants;
}
serverQueue.addAll(assignedServers);
return stats;
}

View File

@ -528,6 +528,7 @@ public class DruidMasterRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
@ -601,6 +602,7 @@ public class DruidMasterRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
@ -847,6 +849,7 @@ public class DruidMasterRuleRunnerTest
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);
@ -1032,6 +1035,7 @@ public class DruidMasterRuleRunnerTest
.withAvailableSegments(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterRuntimeParams afterParams = ruleRunner.run(params);