Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-03-07 18:47:05 -06:00
commit 53c8ba6e75
4 changed files with 90 additions and 38 deletions

View File

@ -20,11 +20,14 @@
package com.metamx.druid.master; package com.metamx.druid.master;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
@ -185,27 +188,79 @@ public class BalancerCostAnalyzer
} }
/** /**
* The assignment application requires us to supply a proposal segment. * For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
* *
* @param proposalSegment A DataSegment that we are proposing to move. * @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier. * @param serverHolders An iterable of ServerHolders for a particular tier.
* *
* @return A ServerHolder with the new home for a segment. * @return A ServerHolder with the new home for a segment.
*/ */
public ServerHolder findNewSegmentHome( public ServerHolder findNewSegmentHomeBalance(
final DataSegment proposalSegment, final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders final Iterable<ServerHolder> serverHolders
) )
{ {
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
if (costsAndServers.isEmpty()) {
return null;
}
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
return null;
}
/**
* For assigment, we want to move to the lowest cost server that isn't already serving the segment.
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
*/
public ServerHolder findNewSegmentHomeAssign(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
while (!costsAndServers.isEmpty()) {
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
}
return null;
}
private MinMaxPriorityQueue<Pair<Double, ServerHolder>> computeCosts(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize(); final long proposalSegmentSize = proposalSegment.getSize();
double minCost = Double.MAX_VALUE;
ServerHolder toServer = null;
for (ServerHolder server : serverHolders) { for (ServerHolder server : serverHolders) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */ /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
server.isLoadingSegment(proposalSegment) ||
server.isServingSegment(proposalSegment)) {
continue; continue;
} }
@ -222,13 +277,10 @@ public class BalancerCostAnalyzer
cost += computeJointSegmentCosts(proposalSegment, segment); cost += computeJointSegmentCosts(proposalSegment, segment);
} }
if (cost < minCost) { costsAndServers.add(Pair.of(cost, server));
minCost = cost;
toServer = server;
}
} }
return toServer; return costsAndServers;
} }
} }

View File

@ -117,7 +117,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
while (iter < maxSegmentsToMove) { while (iter < maxSegmentsToMove) {
iter++; iter++;
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments);
final ServerHolder holder = analyzer.findNewSegmentHome(segmentToMove.getSegment(), serverHolderList); final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
if (holder == null) { if (holder == null) {
continue; continue;
} }

View File

@ -55,30 +55,6 @@ public class DruidMasterLogger implements DruidMasterHelper
} }
} }
Map<String, AtomicLong> unassigned = stats.getPerTierStats().get("unassignedCount");
if (unassigned != null) {
for (Map.Entry<String, AtomicLong> entry : unassigned.entrySet()) {
emitter.emit(
new ServiceMetricEvent.Builder().build(
String.format("master/%s/unassigned/count", entry.getKey()),
entry.getValue().get()
)
);
}
}
Map<String, AtomicLong> sizes = stats.getPerTierStats().get("unassignedSize");
if (sizes != null) {
for (Map.Entry<String, AtomicLong> entry : sizes.entrySet()) {
emitter.emit(
new ServiceMetricEvent.Builder().build(
String.format("master/%s/unassigned/size", entry.getKey()),
entry.getValue().get()
)
);
}
}
Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount"); Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount");
if (dropped != null) { if (dropped != null) {
for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) { for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) {
@ -89,6 +65,30 @@ public class DruidMasterLogger implements DruidMasterHelper
} }
} }
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/cost/raw", stats.getGlobalStats().get("initialCost")
)
);
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/cost/normalization", stats.getGlobalStats().get("normalization")
)
);
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/cost/normalized", stats.getGlobalStats().get("normalizedInitialCostTimesOneThousand").doubleValue() / 1000d
)
);
emitter.emit(
new ServiceMetricEvent.Builder().build(
"master/moved/count", stats.getGlobalStats().get("movedCount")
)
);
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder().build( new ServiceMetricEvent.Builder().build(
"master/deleted/count", stats.getGlobalStats().get("deletedCount") "master/deleted/count", stats.getGlobalStats().get("deletedCount")

View File

@ -91,7 +91,7 @@ public abstract class LoadRule implements Rule
final MasterStats stats = new MasterStats(); final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
final ServerHolder holder = analyzer.findNewSegmentHome(segment, serverHolderList); final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
if (holder == null) { if (holder == null) {
log.warn( log.warn(