Merge pull request #198 from metamx/loadBalancer

Improve runtime of segment distribution algorithm
This commit is contained in:
cheddar 2013-08-13 18:47:43 -07:00
commit 4791d0466d
20 changed files with 983 additions and 236 deletions

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
public interface BalancerStrategy
{
public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List<ServerHolder> serverHolders);
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders);
public void emitStats(String tier, MasterStats stats, List<ServerHolder> serverHolderList);
}

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public interface BalancerStrategyFactory
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
}

View File

@ -1,95 +1,132 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
/**
* The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in
* computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their
* respective initial servers to other servers, chosen greedily to minimize the cost of the cluster.
*/
public class BalancerCostAnalyzer
public class CostBalancerStrategy implements BalancerStrategy
{
private static final Logger log = new Logger(BalancerCostAnalyzer.class);
private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class);
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS;
private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS;
private final Random rand;
private final DateTime referenceTimestamp;
public BalancerCostAnalyzer(DateTime referenceTimestamp)
public CostBalancerStrategy(DateTime referenceTimestamp)
{
this.referenceTimestamp = referenceTimestamp;
rand = new Random(0);
}
/**
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node).
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/
public double calculateNormalization(final List<ServerHolder> serverHolders)
@Override
public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
cost += computeJointSegmentCosts(segment, segment);
}
ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs;
if (holder!=null && !holder.isServingSegment(proposalSegment))
{
return holder;
}
return cost;
return null;
}
/**
* Calculates the initial cost of the Druid segment configuration.
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The initial cost of the Druid tier.
*/
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
double cost = 0;
return chooseBestServer(proposalSegment, serverHolders, true).rhs;
}
/**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
*/
private Pair<Double, ServerHolder> chooseBestServer(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders,
boolean includeCurrentServer
)
{
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
for (ServerHolder server : serverHolders) {
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
for (int j = i; j < segments.length; ++j) {
cost += computeJointSegmentCosts(segments[i], segments[j]);
if (includeCurrentServer || !server.isServingSegment(proposalSegment))
{
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
if (cost < bestServer.lhs) {
bestServer = Pair.of(cost, server);
}
}
}
return cost;
return bestServer;
}
/**
@ -122,8 +159,10 @@ public class BalancerCostAnalyzer
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
);
if (maxDiff < SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS;
double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis();
double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis();
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS);
}
/** gap is null if the two segment intervals overlap or if they're adjacent */
@ -141,129 +180,77 @@ public class BalancerCostAnalyzer
return cost;
}
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
{
ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}
/**
* The balancing application requires us to pick a proposal segment uniformly at random from the set of
* all servers. We use reservoir sampling to do this.
* Calculates the initial cost of the Druid segment configuration.
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return A BalancerSegmentHolder sampled uniformly at random.
* @return The initial cost of the Druid tier.
*/
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar + 1), swap out the server and segment
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
numSoFar++;
DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{});
for (int i = 0; i < segments.length; ++i) {
for (int j = i; j < segments.length; ++j) {
cost += computeJointSegmentCosts(segments[i], segments[j]);
}
}
}
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
return cost;
}
/**
* For balancing, we want to only make a move if the minimum cost server is not already serving the segment.
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own compute node).
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own compute node.
*/
public ServerHolder findNewSegmentHomeBalance(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
public double calculateNormalization(final List<ServerHolder> serverHolders)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
if (costsAndServers.isEmpty()) {
return null;
}
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
return null;
}
/**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
*
* @param proposalSegment A DataSegment that we are proposing to move.
* @param serverHolders An iterable of ServerHolders for a particular tier.
*
* @return A ServerHolder with the new home for a segment.
*/
public ServerHolder findNewSegmentHomeAssign(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = computeCosts(proposalSegment, serverHolders);
while (!costsAndServers.isEmpty()) {
ServerHolder toServer = costsAndServers.pollFirst().rhs;
if (!toServer.isServingSegment(proposalSegment)) {
return toServer;
}
}
return null;
}
private MinMaxPriorityQueue<Pair<Double, ServerHolder>> computeCosts(
final DataSegment proposalSegment,
final Iterable<ServerHolder> serverHolders
)
{
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize();
double cost = 0;
for (ServerHolder server : serverHolders) {
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue;
}
/** The contribution to the total cost of a given server by proposing to move the segment to that server is... */
double cost = 0f;
/** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */
for (DataSegment segment : server.getServer().getSegments().values()) {
if (!proposalSegment.equals(segment)) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
cost += computeJointSegmentCosts(segment, segment);
}
/** plus the costs of segments that will be loaded */
for (DataSegment segment : server.getPeon().getSegmentsToLoad()) {
cost += computeJointSegmentCosts(proposalSegment, segment);
}
costsAndServers.add(Pair.of(cost, server));
}
return costsAndServers;
return cost;
}
}
@Override
public void emitStats(
String tier,
MasterStats stats, List<ServerHolder> serverHolderList
)
{
final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
final double normalization = calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
stats.addToTieredStat("normalization", tier, (long) normalization);
stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
tier,
initialTotalCost,
normalization,
normalizedInitialCost
);
}
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{
return new CostBalancerStrategy(referenceTimestamp);
}
}

View File

@ -92,7 +92,6 @@ public class DruidMaster
private final IndexingServiceClient indexingServiceClient;
private final ScheduledExecutorService exec;
private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch;
private volatile AtomicReference<MasterSegmentSettings> segmentSettingsAtomicReference;

View File

@ -78,7 +78,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
{
final MasterStats stats = new MasterStats();
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
@ -113,34 +113,25 @@ public class DruidMasterBalancer implements DruidMasterHelper
}
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList);
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
}
}
}
final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList);
final double normalization = analyzer.calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
stats.addToTieredStat("initialCost", tier, (long) initialTotalCost);
stats.addToTieredStat("normalization", tier, (long) normalization);
stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000));
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getMasterSegmentSettings().isEmitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
}
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]",
tier,
initialTotalCost,
normalization,
normalizedInitialCost,
currentlyMovingSegments.get(tier).size()
"[%s]: Segments Moved: [%d]", tier, currentlyMovingSegments.get(tier).size()
);
}
return params.buildFromExisting()

View File

@ -42,12 +42,6 @@ public abstract class DruidMasterConfig
@Default("PT1800s")
public abstract Duration getMasterSegmentMergerPeriod();
@Config("druid.master.millisToWaitBeforeDeleting")
public long getMillisToWaitBeforeDeleting()
{
return 15 * 60 * 1000L;
}
@Config("druid.master.merger.on")
public boolean isMergeSegments()
{
@ -66,22 +60,6 @@ public abstract class DruidMasterConfig
return null;
}
@Config("druid.master.merge.threshold")
public long getMergeBytesLimit()
{
return 100000000L;
}
@Config("druid.master.merge.maxSegments")
public int getMergeSegmentsLimit()
{
return Integer.MAX_VALUE;
}
@Config("druid.master.balancer.maxSegmentsToMove")
@Default("5")
public abstract int getMaxSegmentsToMove();
@Config("druid.master.replicant.lifetime")
@Default("15")
public abstract int getReplicantLifetime();

View File

@ -68,7 +68,6 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager();
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {

View File

@ -49,6 +49,7 @@ public class DruidMasterRuntimeParams
private final MasterSegmentSettings masterSegmentSettings;
private final MasterStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategyFactory strategyFactory;
public DruidMasterRuntimeParams(
long startTime,
@ -62,7 +63,8 @@ public class DruidMasterRuntimeParams
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
DateTime balancerReferenceTimestamp
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@ -77,6 +79,7 @@ public class DruidMasterRuntimeParams
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory = strategyFactory;
}
public long getStartTime()
@ -139,9 +142,9 @@ public class DruidMasterRuntimeParams
return balancerReferenceTimestamp;
}
public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp)
public BalancerStrategyFactory getBalancerStrategyFactory()
{
return new BalancerCostAnalyzer(referenceTimestamp);
return strategyFactory;
}
public boolean hasDeletionWaitTimeElapsed()
@ -168,7 +171,8 @@ public class DruidMasterRuntimeParams
emitter,
masterSegmentSettings,
stats,
balancerReferenceTimestamp
balancerReferenceTimestamp,
strategyFactory
);
}
@ -186,6 +190,7 @@ public class DruidMasterRuntimeParams
private MasterSegmentSettings masterSegmentSettings;
private MasterStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategyFactory strategyFactory;
Builder()
{
@ -201,6 +206,7 @@ public class DruidMasterRuntimeParams
this.stats = new MasterStats();
this.masterSegmentSettings = new MasterSegmentSettings.Builder().build();
this.balancerReferenceTimestamp = null;
this.strategyFactory = new CostBalancerStrategyFactory();
}
Builder(
@ -215,7 +221,8 @@ public class DruidMasterRuntimeParams
ServiceEmitter emitter,
MasterSegmentSettings masterSegmentSettings,
MasterStats stats,
DateTime balancerReferenceTimestamp
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
)
{
this.startTime = startTime;
@ -230,6 +237,7 @@ public class DruidMasterRuntimeParams
this.masterSegmentSettings = masterSegmentSettings;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory=strategyFactory;
}
public DruidMasterRuntimeParams build()
@ -246,7 +254,8 @@ public class DruidMasterRuntimeParams
emitter,
masterSegmentSettings,
stats,
balancerReferenceTimestamp
balancerReferenceTimestamp,
strategyFactory
);
}
@ -321,5 +330,11 @@ public class DruidMasterRuntimeParams
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
return this;
}
public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
{
this.strategyFactory=strategyFactory;
return this;
}
}
}

View File

@ -28,19 +28,22 @@ public class MasterSegmentSettings
private long mergeBytesLimit= 100000000L;
private int mergeSegmentsLimit = Integer.MAX_VALUE;
private int maxSegmentsToMove = 5;
private boolean emitBalancingStats = false;
@JsonCreator
public MasterSegmentSettings(
@JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting,
@JsonProperty("mergeBytesLimit") Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove
@JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove,
@JsonProperty("emitBalancingStats") Boolean emitBalancingStats
)
{
this.maxSegmentsToMove=maxSegmentsToMove;
this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting;
this.mergeSegmentsLimit=mergeSegmentsLimit;
this.mergeBytesLimit=mergeBytesLimit;
this.emitBalancingStats = emitBalancingStats;
}
public static String getConfigKey()
@ -60,6 +63,12 @@ public class MasterSegmentSettings
return mergeBytesLimit;
}
@JsonProperty
public boolean isEmitBalancingStats()
{
return emitBalancingStats;
}
@JsonProperty
public int getMergeSegmentsLimit()
{
@ -80,6 +89,7 @@ public class MasterSegmentSettings
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int maxSegmentsToMove;
private boolean emitBalancingStats;
public Builder()
{
@ -87,14 +97,16 @@ public class MasterSegmentSettings
this.mergeBytesLimit= 100000000L;
this.mergeSegmentsLimit= Integer.MAX_VALUE;
this.maxSegmentsToMove = 5;
this.emitBalancingStats = false;
}
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove)
public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.emitBalancingStats = emitBalancingStats;
}
public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@ -123,7 +135,7 @@ public class MasterSegmentSettings
public MasterSegmentSettings build()
{
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove);
return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats);
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
import java.util.Random;
public class RandomBalancerStrategy implements BalancerStrategy
{
private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
@Override
public ServerHolder findNewSegmentHomeReplicator(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
if (serverHolders.size()==1)
{
return null;
}
else
{
ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
while (holder.isServingSegment(proposalSegment))
{
holder = serverHolders.get(new Random().nextInt(serverHolders.size()));
}
return holder;
}
}
@Override
public ServerHolder findNewSegmentHomeBalancer(
DataSegment proposalSegment, List<ServerHolder> serverHolders
)
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
{
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}
@Override
public void emitStats(
String tier, MasterStats stats, List<ServerHolder> serverHolderList
)
{
}
}

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import org.joda.time.DateTime;
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
{
return new RandomBalancerStrategy();
}
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import java.util.List;
import java.util.Random;
public class ReservoirSegmentSampler
{
public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
{
final Random rand = new Random();
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments().values()) {
int randNum = rand.nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {
fromServerHolder = server;
proposalSegment = segment;
}
numSoFar++;
}
}
if (fromServerHolder != null) {
return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment);
} else {
return null;
}
}
}

View File

@ -22,7 +22,7 @@ package com.metamx.druid.master.rules;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.BalancerCostAnalyzer;
import com.metamx.druid.master.BalancerStrategy;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
@ -60,15 +60,14 @@ public abstract class LoadRule implements Rule
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp);
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
analyzer,
strategy,
serverHolderList,
segment
)
@ -84,7 +83,7 @@ public abstract class LoadRule implements Rule
final ReplicationThrottler replicationManager,
final int expectedReplicants,
int totalReplicants,
final BalancerCostAnalyzer analyzer,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
final DataSegment segment
)
@ -98,7 +97,7 @@ public abstract class LoadRule implements Rule
break;
}
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
if (holder == null) {
log.warn(

View File

@ -4,6 +4,7 @@ $(function () {
document.getElementById("mergeBytes").value = data["mergeBytesLimit"];
document.getElementById("mergeSegments").value = data["mergeSegmentsLimit"];
document.getElementById("maxSegments").value = data["maxSegmentsToMove"];
document.getElementById("emitBalancingStats").value = data["emitBalancingStats"];
});
$("#submit").click( function ()

View File

@ -13,6 +13,7 @@
<br>
maxSegmentsToMove: <input type= "text" name ="maxSegmentsToMove" id ="maxSegments">
<br>
emitBalancingStats: <input type= "text" name ="emitBalancingStats" id="emitBalancingStats">
<button type="button" id="submit"> Submit </button>
</form>
</body>

View File

@ -132,6 +132,78 @@ public class DruidMasterBalancerTest
EasyMock.verify(druidServer4);
}
@Test
public void testMoveToEmptyServerBalancer()
{
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the master needs
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(
ImmutableMap.<String, LoadQueuePeon>of(
"from",
fromPeon,
"to",
toPeon
)
)
.withAvailableSegments(segments.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
}
@Test
public void testRun1()
{
@ -203,7 +275,8 @@ public class DruidMasterBalancerTest
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
@Test
@Test
public void testRun2()
{
// Mock some servers of different usages
@ -295,4 +368,5 @@ public class DruidMasterBalancerTest
params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
}
}

View File

@ -36,7 +36,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -95,24 +94,12 @@ public class DruidMasterTest
return null;
}
@Override
public long getMillisToWaitBeforeDeleting()
{
return super.getMillisToWaitBeforeDeleting();
}
@Override
public String getMergerServiceName()
{
return "";
}
@Override
public int getMaxSegmentsToMove()
{
return 0;
}
@Override
public int getReplicantLifetime()
{

View File

@ -0,0 +1,208 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class ReservoirSegmentSamplerTest
{
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private ServerHolder holder1;
private ServerHolder holder2;
private ServerHolder holder3;
private ServerHolder holder4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
Map<String, DataSegment> segmentsMap1;
Map<String, DataSegment> segmentsMap2;
Map<String, DataSegment> segmentsMap3;
Map<String, DataSegment> segmentsMap4;
List<DataSegment> segments;
@Before
public void setUp() throws Exception
{
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
holder1 = EasyMock.createMock(ServerHolder.class);
holder2 = EasyMock.createMock(ServerHolder.class);
holder3 = EasyMock.createMock(ServerHolder.class);
holder4 = EasyMock.createMock(ServerHolder.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
DateTime start1 = new DateTime("2012-01-01");
DateTime start2 = new DateTime("2012-02-01");
DateTime version = new DateTime("2012-03-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
11L
);
segment2 = new DataSegment(
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
7L
);
segment3 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
4L
);
segment4 = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
8L
);
segments = Lists.newArrayList(segment1, segment2, segment3, segment4);
segmentsMap1 = ImmutableMap.<String, DataSegment>of(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment1
);
segmentsMap2 = ImmutableMap.<String, DataSegment>of(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment2
);
segmentsMap3 = ImmutableMap.<String, DataSegment>of(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment3
);
segmentsMap4 = ImmutableMap.<String, DataSegment>of(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z",
segment4
);
}
//checks if every segment is selected at least once out of 5000 trials
@Test
public void getRandomBalancerSegmentHolderTest()
{
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segmentsMap1).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(segmentsMap2).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(segmentsMap3).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(segmentsMap4).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
EasyMock.replay(holder1);
EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
EasyMock.replay(holder2);
EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
EasyMock.replay(holder3);
EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes();
EasyMock.replay(holder4);
List<ServerHolder> holderList = Lists.newArrayList();
holderList.add(holder1);
holderList.add(holder2);
holderList.add(holder3);
holderList.add(holder4);
Map<DataSegment, Integer> segmentCountMap = Maps.newHashMap();
ReservoirSegmentSampler sampler = new ReservoirSegmentSampler();
for (int i = 0; i < 5000; i++) {
segmentCountMap.put(sampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
}
for (DataSegment segment : segments) {
Assert.assertEquals(segmentCountMap.get(segment), new Integer(1));
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.utils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.master.DruidCluster;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterBalancerTester;
import com.metamx.druid.master.DruidMasterRuleRunner;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.LoadQueuePeon;
import com.metamx.druid.master.LoadQueuePeonTester;
import com.metamx.druid.master.MasterSegmentSettings;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.SegmentReplicantLookup;
import com.metamx.druid.master.ServerHolder;
import com.metamx.druid.master.rules.PeriodLoadRule;
import com.metamx.druid.master.rules.Rule;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DruidMasterBalancerProfiler
{
private static final int MAX_SEGMENTS_TO_MOVE = 5;
private DruidMaster master;
private DruidServer druidServer1;
private DruidServer druidServer2;
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal");
List<Rule> rules = ImmutableList.<Rule>of(loadRule);
@Before
public void setUp() throws Exception
{
master = EasyMock.createMock(DruidMaster.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
manager = EasyMock.createMock(DatabaseRuleManager.class);
}
public void bigProfiler()
{
Stopwatch watch = new Stopwatch();
int numSegments = 55000;
int numServers=50;
EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.<String, List<Rule>>of("test", rules)).anyTimes();
EasyMock.expect(manager.getRules(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.expect(manager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(rules).anyTimes();
EasyMock.replay(manager);
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
List<DruidServer> serverList = Lists.newArrayList();
Map<String, LoadQueuePeon> peonMap = Maps.newHashMap();
List<ServerHolder> serverHolderList = Lists.newArrayList();
Map<String,DataSegment> segmentMap = Maps.newHashMap();
for (int i=0;i<numSegments;i++)
{
segmentMap.put(
"segment" + i,
new DataSegment(
"datasource" + i,
new Interval(new DateTime("2012-01-01"), (new DateTime("2012-01-01")).plusHours(1)),
(new DateTime("2012-03-01")).toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
4L
)
);
}
for (int i=0;i<numServers;i++)
{
DruidServer server =EasyMock.createMock(DruidServer.class);
EasyMock.expect(server.getMetadata()).andReturn(null).anyTimes();
EasyMock.expect(server.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(server.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(server.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
if (i==0)
{
EasyMock.expect(server.getSegments()).andReturn(segmentMap).anyTimes();
}
else
{
EasyMock.expect(server.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
}
EasyMock.expect(server.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
LoadQueuePeon peon = new LoadQueuePeonTester();
peonMap.put(Integer.toString(i),peon);
serverHolderList.add(new ServerHolder(server, peon));
}
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
serverHolderList
)
)
)
)
.withLoadManagementPeons(
peonMap
)
.withAvailableSegments(segmentMap.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.withEmitter(emitter)
.withDatabaseRuleManager(manager)
.withReplicationManager(new ReplicationThrottler(2, 500))
.withSegmentReplicantLookup(
SegmentReplicantLookup.make(new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
serverHolderList
)
)
)
)
)
.build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5);
watch.start();
DruidMasterRuntimeParams balanceParams = tester.run(params);
DruidMasterRuntimeParams assignParams = runner.run(params);
System.out.println(watch.stop());
}
public void profileRun(){
Stopwatch watch = new Stopwatch();
LoadQueuePeonTester fromPeon = new LoadQueuePeonTester();
LoadQueuePeonTester toPeon = new LoadQueuePeonTester();
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.<String>anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master);
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
)
)
)
)
)
.withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
.withAvailableSegments(segments.values())
.withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build();
DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master);
watch.start();
DruidMasterRuntimeParams balanceParams = tester.run(params);
System.out.println(watch.stop());
}
}