diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java new file mode 100644 index 00000000000..84944114b49 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategy.java @@ -0,0 +1,35 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; + +import java.util.List; + +public interface BalancerStrategy +{ + public ServerHolder findNewSegmentHomeBalancer(final DataSegment proposalSegment, final List serverHolders); + + public ServerHolder findNewSegmentHomeReplicator(final DataSegment proposalSegment, final List serverHolders); + + public BalancerSegmentHolder pickSegmentToMove(final List serverHolders); + + public void emitStats(String tier, MasterStats stats, List serverHolderList); +} diff --git a/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java new file mode 100644 index 00000000000..c215c1b39a9 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/BalancerStrategyFactory.java @@ -0,0 +1,26 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ +package com.metamx.druid.master; + +import org.joda.time.DateTime; + +public interface BalancerStrategyFactory +{ + public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp); +} diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java similarity index 51% rename from server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java rename to server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java index 9a22daf2c13..47f7fb19436 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java @@ -1,95 +1,132 @@ /* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ package com.metamx.druid.master; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.common.Pair; -import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Comparator; import java.util.List; -import java.util.Random; -/** - * The BalancerCostAnalyzer will compute the total initial cost of the cluster, with costs defined in - * computeJointSegmentCosts. It will then propose to move (pseudo-)randomly chosen segments from their - * respective initial servers to other servers, chosen greedily to minimize the cost of the cluster. - */ -public class BalancerCostAnalyzer +public class CostBalancerStrategy implements BalancerStrategy { - private static final Logger log = new Logger(BalancerCostAnalyzer.class); + private static final EmittingLogger log = new EmittingLogger(CostBalancerStrategy.class); private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; - private final Random rand; private final DateTime referenceTimestamp; - public BalancerCostAnalyzer(DateTime referenceTimestamp) + public CostBalancerStrategy(DateTime referenceTimestamp) { this.referenceTimestamp = referenceTimestamp; - rand = new Random(0); } - /** - * Calculates the cost normalization. This is such that the normalized cost is lower bounded - * by 1 (e.g. when each segment gets its own compute node). - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own compute node. - */ - public double calculateNormalization(final List serverHolders) + @Override + public ServerHolder findNewSegmentHomeReplicator( + DataSegment proposalSegment, List serverHolders + ) { - double cost = 0; - for (ServerHolder server : serverHolders) { - for (DataSegment segment : server.getServer().getSegments().values()) { - cost += computeJointSegmentCosts(segment, segment); - } + ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs; + if (holder!=null && !holder.isServingSegment(proposalSegment)) + { + return holder; } - return cost; + return null; } - /** - * Calculates the initial cost of the Druid segment configuration. - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The initial cost of the Druid tier. - */ - public double calculateInitialTotalCost(final List serverHolders) + + @Override + public ServerHolder findNewSegmentHomeBalancer( + DataSegment proposalSegment, List serverHolders + ) { - double cost = 0; + return chooseBestServer(proposalSegment, serverHolders, true).rhs; + } + + + + /** + * For assignment, we want to move to the lowest cost server that isn't already serving the segment. + * + * @param proposalSegment A DataSegment that we are proposing to move. + * @param serverHolders An iterable of ServerHolders for a particular tier. + * + * @return A ServerHolder with the new home for a segment. + */ + + private Pair chooseBestServer( + final DataSegment proposalSegment, + final Iterable serverHolders, + boolean includeCurrentServer + ) + { + + Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); + MinMaxPriorityQueue> costsAndServers = MinMaxPriorityQueue.orderedBy( + new Comparator>() + { + @Override + public int compare( + Pair o, + Pair o1 + ) + { + return Double.compare(o.lhs, o1.lhs); + } + } + ).create(); + + final long proposalSegmentSize = proposalSegment.getSize(); + for (ServerHolder server : serverHolders) { - DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); - for (int i = 0; i < segments.length; ++i) { - for (int j = i; j < segments.length; ++j) { - cost += computeJointSegmentCosts(segments[i], segments[j]); + if (includeCurrentServer || !server.isServingSegment(proposalSegment)) + { + /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ + if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { + continue; + } + + /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ + double cost = 0f; + /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ + for (DataSegment segment : server.getServer().getSegments().values()) { + if (!proposalSegment.equals(segment)) { + cost += computeJointSegmentCosts(proposalSegment, segment); + } + } + /** plus the costs of segments that will be loaded */ + for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { + cost += computeJointSegmentCosts(proposalSegment, segment); + } + + if (cost < bestServer.lhs) { + bestServer = Pair.of(cost, server); } } } - return cost; + + return bestServer; } /** @@ -122,8 +159,10 @@ public class BalancerCostAnalyzer referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() ); - if (maxDiff < SEVEN_DAYS_IN_MILLIS) { - recencyPenalty = 2 - maxDiff / SEVEN_DAYS_IN_MILLIS; + double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis(); + double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis(); + if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff serverHolders) + { + ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); + return sampler.getRandomBalancerSegmentHolder(serverHolders); + } + /** - * The balancing application requires us to pick a proposal segment uniformly at random from the set of - * all servers. We use reservoir sampling to do this. + * Calculates the initial cost of the Druid segment configuration. * * @param serverHolders A list of ServerHolders for a particular tier. * - * @return A BalancerSegmentHolder sampled uniformly at random. + * @return The initial cost of the Druid tier. */ - public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) + public double calculateInitialTotalCost(final List serverHolders) { - ServerHolder fromServerHolder = null; - DataSegment proposalSegment = null; - int numSoFar = 0; - + double cost = 0; for (ServerHolder server : serverHolders) { - for (DataSegment segment : server.getServer().getSegments().values()) { - int randNum = rand.nextInt(numSoFar + 1); - // w.p. 1 / (numSoFar + 1), swap out the server and segment - if (randNum == numSoFar) { - fromServerHolder = server; - proposalSegment = segment; - numSoFar++; + DataSegment[] segments = server.getServer().getSegments().values().toArray(new DataSegment[]{}); + for (int i = 0; i < segments.length; ++i) { + for (int j = i; j < segments.length; ++j) { + cost += computeJointSegmentCosts(segments[i], segments[j]); } } } - - return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment); + return cost; } /** - * For balancing, we want to only make a move if the minimum cost server is not already serving the segment. + * Calculates the cost normalization. This is such that the normalized cost is lower bounded + * by 1 (e.g. when each segment gets its own compute node). * - * @param proposalSegment A DataSegment that we are proposing to move. - * @param serverHolders An iterable of ServerHolders for a particular tier. + * @param serverHolders A list of ServerHolders for a particular tier. * - * @return A ServerHolder with the new home for a segment. + * @return The normalization value (the sum of the diagonal entries in the + * pairwise cost matrix). This is the cost of a cluster if each + * segment were to get its own compute node. */ - public ServerHolder findNewSegmentHomeBalance( - final DataSegment proposalSegment, - final Iterable serverHolders - ) + public double calculateNormalization(final List serverHolders) { - MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); - if (costsAndServers.isEmpty()) { - return null; - } - - ServerHolder toServer = costsAndServers.pollFirst().rhs; - if (!toServer.isServingSegment(proposalSegment)) { - return toServer; - } - - return null; - } - - /** - * For assignment, we want to move to the lowest cost server that isn't already serving the segment. - * - * @param proposalSegment A DataSegment that we are proposing to move. - * @param serverHolders An iterable of ServerHolders for a particular tier. - * - * @return A ServerHolder with the new home for a segment. - */ - public ServerHolder findNewSegmentHomeAssign( - final DataSegment proposalSegment, - final Iterable serverHolders - ) - { - MinMaxPriorityQueue> costsAndServers = computeCosts(proposalSegment, serverHolders); - while (!costsAndServers.isEmpty()) { - ServerHolder toServer = costsAndServers.pollFirst().rhs; - if (!toServer.isServingSegment(proposalSegment)) { - return toServer; - } - } - - return null; - } - - private MinMaxPriorityQueue> computeCosts( - final DataSegment proposalSegment, - final Iterable serverHolders - ) - { - MinMaxPriorityQueue> costsAndServers = MinMaxPriorityQueue.orderedBy( - new Comparator>() - { - @Override - public int compare( - Pair o, - Pair o1 - ) - { - return Double.compare(o.lhs, o1.lhs); - } - } - ).create(); - - final long proposalSegmentSize = proposalSegment.getSize(); - + double cost = 0; for (ServerHolder server : serverHolders) { - /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ - if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { - continue; - } - - /** The contribution to the total cost of a given server by proposing to move the segment to that server is... */ - double cost = 0f; - /** the sum of the costs of other (exclusive of the proposalSegment) segments on the server */ for (DataSegment segment : server.getServer().getSegments().values()) { - if (!proposalSegment.equals(segment)) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } + cost += computeJointSegmentCosts(segment, segment); } - /** plus the costs of segments that will be loaded */ - for (DataSegment segment : server.getPeon().getSegmentsToLoad()) { - cost += computeJointSegmentCosts(proposalSegment, segment); - } - - costsAndServers.add(Pair.of(cost, server)); } - - return costsAndServers; + return cost; } -} + @Override + public void emitStats( + String tier, + MasterStats stats, List serverHolderList + ) + { + final double initialTotalCost = calculateInitialTotalCost(serverHolderList); + final double normalization = calculateNormalization(serverHolderList); + final double normalizedInitialCost = initialTotalCost / normalization; + + stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); + stats.addToTieredStat("normalization", tier, (long) normalization); + stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000)); + + log.info( + "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", + tier, + initialTotalCost, + normalization, + normalizedInitialCost + ); + + } + + +} \ No newline at end of file diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java new file mode 100644 index 00000000000..76e315b1625 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategyFactory.java @@ -0,0 +1,31 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ +package com.metamx.druid.master; + +import org.joda.time.DateTime; + +public class CostBalancerStrategyFactory implements BalancerStrategyFactory +{ + + @Override + public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + { + return new CostBalancerStrategy(referenceTimestamp); + } +} diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 64169873975..fa1731be37e 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -92,7 +92,6 @@ public class DruidMaster private final IndexingServiceClient indexingServiceClient; private final ScheduledExecutorService exec; private final LoadQueueTaskMaster taskMaster; - private final Map loadManagementPeons; private final AtomicReference leaderLatch; private volatile AtomicReference segmentSettingsAtomicReference; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index d1a96a00cbb..bb547bae8d8 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -78,7 +78,7 @@ public class DruidMasterBalancer implements DruidMasterHelper { final MasterStats stats = new MasterStats(); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); + final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); final int maxSegmentsToMove = params.getMasterSegmentSettings().getMaxSegmentsToMove(); for (Map.Entry> entry : @@ -113,34 +113,25 @@ public class DruidMasterBalancer implements DruidMasterHelper } for (int iter = 0; iter < maxSegmentsToMove; iter++) { - final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList); + final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); - if (params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); + if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { + final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); if (holder != null) { moveSegment(segmentToMove, holder.getServer(), params); } } } - - final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); - final double normalization = analyzer.calculateNormalization(serverHolderList); - final double normalizedInitialCost = initialTotalCost / normalization; - - stats.addToTieredStat("initialCost", tier, (long) initialTotalCost); - stats.addToTieredStat("normalization", tier, (long) normalization); - stats.addToTieredStat("normalizedInitialCostTimesOneThousand", tier, (long) (normalizedInitialCost * 1000)); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); + if (params.getMasterSegmentSettings().isEmitBalancingStats()) { + strategy.emitStats(tier, stats, serverHolderList); + } log.info( - "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f], Segments Moved: [%d]", - tier, - initialTotalCost, - normalization, - normalizedInitialCost, - currentlyMovingSegments.get(tier).size() + "[%s]: Segments Moved: [%d]", tier, currentlyMovingSegments.get(tier).size() ); + } return params.buildFromExisting() diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index d514b4d5c4f..3e2b2719870 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -42,12 +42,6 @@ public abstract class DruidMasterConfig @Default("PT1800s") public abstract Duration getMasterSegmentMergerPeriod(); - @Config("druid.master.millisToWaitBeforeDeleting") - public long getMillisToWaitBeforeDeleting() - { - return 15 * 60 * 1000L; - } - @Config("druid.master.merger.on") public boolean isMergeSegments() { @@ -66,22 +60,6 @@ public abstract class DruidMasterConfig return null; } - @Config("druid.master.merge.threshold") - public long getMergeBytesLimit() - { - return 100000000L; - } - - @Config("druid.master.merge.maxSegments") - public int getMergeSegmentsLimit() - { - return Integer.MAX_VALUE; - } - - @Config("druid.master.balancer.maxSegmentsToMove") - @Default("5") - public abstract int getMaxSegmentsToMove(); - @Config("druid.master.replicant.lifetime") @Default("15") public abstract int getReplicantLifetime(); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java index 2bd4870ff2c..04a3ce55da6 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java @@ -68,7 +68,6 @@ public class DruidMasterRuleRunner implements DruidMasterHelper DatabaseRuleManager databaseRuleManager = paramsWithReplicationManager.getDatabaseRuleManager(); for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) { List rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource()); - boolean foundMatchingRule = false; for (Rule rule : rules) { if (rule.appliesTo(segment, now)) { diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java index 3c76b587229..0dfe9afca4f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java @@ -49,6 +49,7 @@ public class DruidMasterRuntimeParams private final MasterSegmentSettings masterSegmentSettings; private final MasterStats stats; private final DateTime balancerReferenceTimestamp; + private final BalancerStrategyFactory strategyFactory; public DruidMasterRuntimeParams( long startTime, @@ -62,7 +63,8 @@ public class DruidMasterRuntimeParams ServiceEmitter emitter, MasterSegmentSettings masterSegmentSettings, MasterStats stats, - DateTime balancerReferenceTimestamp + DateTime balancerReferenceTimestamp, + BalancerStrategyFactory strategyFactory ) { this.startTime = startTime; @@ -77,6 +79,7 @@ public class DruidMasterRuntimeParams this.masterSegmentSettings = masterSegmentSettings; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; + this.strategyFactory = strategyFactory; } public long getStartTime() @@ -139,9 +142,9 @@ public class DruidMasterRuntimeParams return balancerReferenceTimestamp; } - public BalancerCostAnalyzer getBalancerCostAnalyzer(DateTime referenceTimestamp) + public BalancerStrategyFactory getBalancerStrategyFactory() { - return new BalancerCostAnalyzer(referenceTimestamp); + return strategyFactory; } public boolean hasDeletionWaitTimeElapsed() @@ -168,7 +171,8 @@ public class DruidMasterRuntimeParams emitter, masterSegmentSettings, stats, - balancerReferenceTimestamp + balancerReferenceTimestamp, + strategyFactory ); } @@ -186,6 +190,7 @@ public class DruidMasterRuntimeParams private MasterSegmentSettings masterSegmentSettings; private MasterStats stats; private DateTime balancerReferenceTimestamp; + private BalancerStrategyFactory strategyFactory; Builder() { @@ -201,6 +206,7 @@ public class DruidMasterRuntimeParams this.stats = new MasterStats(); this.masterSegmentSettings = new MasterSegmentSettings.Builder().build(); this.balancerReferenceTimestamp = null; + this.strategyFactory = new CostBalancerStrategyFactory(); } Builder( @@ -215,7 +221,8 @@ public class DruidMasterRuntimeParams ServiceEmitter emitter, MasterSegmentSettings masterSegmentSettings, MasterStats stats, - DateTime balancerReferenceTimestamp + DateTime balancerReferenceTimestamp, + BalancerStrategyFactory strategyFactory ) { this.startTime = startTime; @@ -230,6 +237,7 @@ public class DruidMasterRuntimeParams this.masterSegmentSettings = masterSegmentSettings; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; + this.strategyFactory=strategyFactory; } public DruidMasterRuntimeParams build() @@ -246,7 +254,8 @@ public class DruidMasterRuntimeParams emitter, masterSegmentSettings, stats, - balancerReferenceTimestamp + balancerReferenceTimestamp, + strategyFactory ); } @@ -321,5 +330,11 @@ public class DruidMasterRuntimeParams this.balancerReferenceTimestamp = balancerReferenceTimestamp; return this; } + + public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory) + { + this.strategyFactory=strategyFactory; + return this; + } } } diff --git a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java index cd6502be130..1bafb38ecc8 100644 --- a/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java +++ b/server/src/main/java/com/metamx/druid/master/MasterSegmentSettings.java @@ -28,19 +28,22 @@ public class MasterSegmentSettings private long mergeBytesLimit= 100000000L; private int mergeSegmentsLimit = Integer.MAX_VALUE; private int maxSegmentsToMove = 5; + private boolean emitBalancingStats = false; @JsonCreator public MasterSegmentSettings( @JsonProperty("millisToWaitBeforeDeleting") Long millisToWaitBeforeDeleting, @JsonProperty("mergeBytesLimit") Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") Integer mergeSegmentsLimit, - @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove + @JsonProperty("maxSegmentsToMove") Integer maxSegmentsToMove, + @JsonProperty("emitBalancingStats") Boolean emitBalancingStats ) { this.maxSegmentsToMove=maxSegmentsToMove; this.millisToWaitBeforeDeleting=millisToWaitBeforeDeleting; this.mergeSegmentsLimit=mergeSegmentsLimit; this.mergeBytesLimit=mergeBytesLimit; + this.emitBalancingStats = emitBalancingStats; } public static String getConfigKey() @@ -60,6 +63,12 @@ public class MasterSegmentSettings return mergeBytesLimit; } + @JsonProperty + public boolean isEmitBalancingStats() + { + return emitBalancingStats; + } + @JsonProperty public int getMergeSegmentsLimit() { @@ -80,6 +89,7 @@ public class MasterSegmentSettings private long mergeBytesLimit; private int mergeSegmentsLimit; private int maxSegmentsToMove; + private boolean emitBalancingStats; public Builder() { @@ -87,14 +97,16 @@ public class MasterSegmentSettings this.mergeBytesLimit= 100000000L; this.mergeSegmentsLimit= Integer.MAX_VALUE; this.maxSegmentsToMove = 5; + this.emitBalancingStats = false; } - public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove) + public Builder(long millisToWaitBeforeDeleting, long mergeBytesLimit, int mergeSegmentsLimit, int maxSegmentsToMove, boolean emitBalancingStats) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + this.emitBalancingStats = emitBalancingStats; } public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) @@ -123,7 +135,7 @@ public class MasterSegmentSettings public MasterSegmentSettings build() { - return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove); + return new MasterSegmentSettings(millisToWaitBeforeDeleting,mergeBytesLimit,mergeSegmentsLimit,maxSegmentsToMove, emitBalancingStats); } } } diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java new file mode 100644 index 00000000000..d953b69b3db --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategy.java @@ -0,0 +1,71 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; + +import java.util.List; +import java.util.Random; + +public class RandomBalancerStrategy implements BalancerStrategy +{ + private final ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); + + @Override + public ServerHolder findNewSegmentHomeReplicator( + DataSegment proposalSegment, List serverHolders + ) + { + if (serverHolders.size()==1) + { + return null; + } + else + { + ServerHolder holder = serverHolders.get(new Random().nextInt(serverHolders.size())); + while (holder.isServingSegment(proposalSegment)) + { + holder = serverHolders.get(new Random().nextInt(serverHolders.size())); + } + return holder; + } + } + + @Override + public ServerHolder findNewSegmentHomeBalancer( + DataSegment proposalSegment, List serverHolders + ) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public BalancerSegmentHolder pickSegmentToMove(List serverHolders) + { + return sampler.getRandomBalancerSegmentHolder(serverHolders); + } + + @Override + public void emitStats( + String tier, MasterStats stats, List serverHolderList + ) + { + } +} diff --git a/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java new file mode 100644 index 00000000000..1994c0a5fea --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/RandomBalancerStrategyFactory.java @@ -0,0 +1,30 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ +package com.metamx.druid.master; + +import org.joda.time.DateTime; + +public class RandomBalancerStrategyFactory implements BalancerStrategyFactory +{ + @Override + public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + { + return new RandomBalancerStrategy(); + } +} diff --git a/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java new file mode 100644 index 00000000000..4db994b821a --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/ReservoirSegmentSampler.java @@ -0,0 +1,54 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; + +import java.util.List; +import java.util.Random; + +public class ReservoirSegmentSampler +{ + + public BalancerSegmentHolder getRandomBalancerSegmentHolder(final List serverHolders) + { + final Random rand = new Random(); + ServerHolder fromServerHolder = null; + DataSegment proposalSegment = null; + int numSoFar = 0; + + for (ServerHolder server : serverHolders) { + for (DataSegment segment : server.getServer().getSegments().values()) { + int randNum = rand.nextInt(numSoFar + 1); + // w.p. 1 / (numSoFar+1), swap out the server and segment + if (randNum == numSoFar) { + fromServerHolder = server; + proposalSegment = segment; + } + numSoFar++; + } + } + if (fromServerHolder != null) { + return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment); + } else { + return null; + } + } +} diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 8a9a014aa79..b133af1799e 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -22,7 +22,7 @@ package com.metamx.druid.master.rules; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.master.BalancerCostAnalyzer; +import com.metamx.druid.master.BalancerStrategy; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.LoadPeonCallback; @@ -60,15 +60,14 @@ public abstract class LoadRule implements Rule final List serverHolderList = new ArrayList(serverQueue); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); - + final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); if (params.getAvailableSegments().contains(segment)) { stats.accumulate( assign( params.getReplicationManager(), expectedReplicants, totalReplicants, - analyzer, + strategy, serverHolderList, segment ) @@ -84,7 +83,7 @@ public abstract class LoadRule implements Rule final ReplicationThrottler replicationManager, final int expectedReplicants, int totalReplicants, - final BalancerCostAnalyzer analyzer, + final BalancerStrategy strategy, final List serverHolderList, final DataSegment segment ) @@ -98,7 +97,7 @@ public abstract class LoadRule implements Rule break; } - final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList); + final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); if (holder == null) { log.warn( diff --git a/server/src/main/resources/static/js/masterSegmentSettings.js b/server/src/main/resources/static/js/masterSegmentSettings.js index b992cc8cf19..832859d97e1 100644 --- a/server/src/main/resources/static/js/masterSegmentSettings.js +++ b/server/src/main/resources/static/js/masterSegmentSettings.js @@ -4,6 +4,7 @@ $(function () { document.getElementById("mergeBytes").value = data["mergeBytesLimit"]; document.getElementById("mergeSegments").value = data["mergeSegmentsLimit"]; document.getElementById("maxSegments").value = data["maxSegmentsToMove"]; + document.getElementById("emitBalancingStats").value = data["emitBalancingStats"]; }); $("#submit").click( function () diff --git a/server/src/main/resources/static/masterSegmentSettings.html b/server/src/main/resources/static/masterSegmentSettings.html index c4f06ec25db..4306305d252 100644 --- a/server/src/main/resources/static/masterSegmentSettings.html +++ b/server/src/main/resources/static/masterSegmentSettings.html @@ -13,6 +13,7 @@
maxSegmentsToMove:
+ emitBalancingStats: diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 4f594832406..8513c157de3 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -132,6 +132,78 @@ public class DruidMasterBalancerTest EasyMock.verify(druidServer4); } + + @Test + public void testMoveToEmptyServerBalancer() + { + EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + + EasyMock.replay(druidServer3); + EasyMock.replay(druidServer4); + + // Mock stuff that the master needs + master.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(master); + + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); + + DruidMasterRuntimeParams params = + DruidMasterRuntimeParams.newBuilder() + .withDruidCluster( + new DruidCluster( + ImmutableMap.>of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) + .create( + Arrays.asList( + new ServerHolder(druidServer1, fromPeon), + new ServerHolder(druidServer2, toPeon) + ) + ) + ) + ) + ) + .withLoadManagementPeons( + ImmutableMap.of( + "from", + fromPeon, + "to", + toPeon + ) + ) + .withAvailableSegments(segments.values()) + .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .build(); + + params = new DruidMasterBalancerTester(master).run(params); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size()); + } + + + + @Test public void testRun1() { @@ -203,7 +275,8 @@ public class DruidMasterBalancerTest Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } - @Test + + @Test public void testRun2() { // Mock some servers of different usages @@ -295,4 +368,5 @@ public class DruidMasterBalancerTest params = new DruidMasterBalancerTester(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } + } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index 213e350f76b..80993ed62f2 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -36,7 +36,6 @@ import org.junit.Before; import org.junit.Test; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; /** */ @@ -95,24 +94,12 @@ public class DruidMasterTest return null; } - @Override - public long getMillisToWaitBeforeDeleting() - { - return super.getMillisToWaitBeforeDeleting(); - } - @Override public String getMergerServiceName() { return ""; } - @Override - public int getMaxSegmentsToMove() - { - return 0; - } - @Override public int getReplicantLifetime() { diff --git a/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java new file mode 100644 index 00000000000..7b768e11172 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/master/ReservoirSegmentSamplerTest.java @@ -0,0 +1,208 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ +package com.metamx.druid.master; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ReservoirSegmentSamplerTest +{ + private DruidServer druidServer1; + private DruidServer druidServer2; + private DruidServer druidServer3; + private DruidServer druidServer4; + + private ServerHolder holder1; + private ServerHolder holder2; + private ServerHolder holder3; + private ServerHolder holder4; + + private DataSegment segment1; + private DataSegment segment2; + private DataSegment segment3; + private DataSegment segment4; + Map segmentsMap1; + Map segmentsMap2; + Map segmentsMap3; + Map segmentsMap4; + List segments; + + @Before + public void setUp() throws Exception + { + druidServer1 = EasyMock.createMock(DruidServer.class); + druidServer1 = EasyMock.createMock(DruidServer.class); + druidServer2 = EasyMock.createMock(DruidServer.class); + druidServer3 = EasyMock.createMock(DruidServer.class); + druidServer4 = EasyMock.createMock(DruidServer.class); + holder1 = EasyMock.createMock(ServerHolder.class); + holder2 = EasyMock.createMock(ServerHolder.class); + holder3 = EasyMock.createMock(ServerHolder.class); + holder4 = EasyMock.createMock(ServerHolder.class); + segment1 = EasyMock.createMock(DataSegment.class); + segment2 = EasyMock.createMock(DataSegment.class); + segment3 = EasyMock.createMock(DataSegment.class); + segment4 = EasyMock.createMock(DataSegment.class); + + DateTime start1 = new DateTime("2012-01-01"); + DateTime start2 = new DateTime("2012-02-01"); + DateTime version = new DateTime("2012-03-01"); + segment1 = new DataSegment( + "datasource1", + new Interval(start1, start1.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 11L + ); + segment2 = new DataSegment( + "datasource1", + new Interval(start2, start2.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 7L + ); + segment3 = new DataSegment( + "datasource2", + new Interval(start1, start1.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 4L + ); + segment4 = new DataSegment( + "datasource2", + new Interval(start2, start2.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 8L + ); + + segments = Lists.newArrayList(segment1, segment2, segment3, segment4); + + segmentsMap1 = ImmutableMap.of( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", + segment1 + ); + segmentsMap2 = ImmutableMap.of( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", + segment2 + ); + segmentsMap3 = ImmutableMap.of( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", + segment3 + ); + segmentsMap4 = ImmutableMap.of( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", + segment4 + ); + } + + //checks if every segment is selected at least once out of 5000 trials + @Test + public void getRandomBalancerSegmentHolderTest() + { + EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer1.getSegments()).andReturn(segmentsMap1).anyTimes(); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getSegments()).andReturn(segmentsMap2).anyTimes(); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + + EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); + EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer3.getSegments()).andReturn(segmentsMap3).anyTimes(); + EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer3); + + EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce(); + EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer4.getSegments()).andReturn(segmentsMap4).anyTimes(); + EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer4); + + EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); + EasyMock.replay(holder1); + EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); + EasyMock.replay(holder2); + + EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); + EasyMock.replay(holder3); + + EasyMock.expect(holder4.getServer()).andReturn(druidServer4).anyTimes(); + EasyMock.replay(holder4); + + List holderList = Lists.newArrayList(); + holderList.add(holder1); + holderList.add(holder2); + holderList.add(holder3); + holderList.add(holder4); + + Map segmentCountMap = Maps.newHashMap(); + ReservoirSegmentSampler sampler = new ReservoirSegmentSampler(); + for (int i = 0; i < 5000; i++) { + segmentCountMap.put(sampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1); + } + + for (DataSegment segment : segments) { + Assert.assertEquals(segmentCountMap.get(segment), new Integer(1)); + } + + + } +} diff --git a/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java new file mode 100644 index 00000000000..482274c0b28 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/utils/DruidMasterBalancerProfiler.java @@ -0,0 +1,249 @@ +/* +* Druid - a distributed column store. +* Copyright (C) 2012 Metamarkets Group Inc. +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* as published by the Free Software Foundation; either version 2 +* of the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +*/ + +package com.metamx.druid.utils; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.MinMaxPriorityQueue; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; +import com.metamx.druid.db.DatabaseRuleManager; +import com.metamx.druid.master.DruidCluster; +import com.metamx.druid.master.DruidMaster; +import com.metamx.druid.master.DruidMasterBalancerTester; +import com.metamx.druid.master.DruidMasterRuleRunner; +import com.metamx.druid.master.DruidMasterRuntimeParams; +import com.metamx.druid.master.LoadPeonCallback; +import com.metamx.druid.master.LoadQueuePeon; +import com.metamx.druid.master.LoadQueuePeonTester; +import com.metamx.druid.master.MasterSegmentSettings; +import com.metamx.druid.master.ReplicationThrottler; +import com.metamx.druid.master.SegmentReplicantLookup; +import com.metamx.druid.master.ServerHolder; +import com.metamx.druid.master.rules.PeriodLoadRule; +import com.metamx.druid.master.rules.Rule; +import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DruidMasterBalancerProfiler +{ + private static final int MAX_SEGMENTS_TO_MOVE = 5; + private DruidMaster master; + private DruidServer druidServer1; + private DruidServer druidServer2; + Map segments = Maps.newHashMap(); + ServiceEmitter emitter; + DatabaseRuleManager manager; + PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"),3,"normal"); + List rules = ImmutableList.of(loadRule); + + @Before + public void setUp() throws Exception + { + master = EasyMock.createMock(DruidMaster.class); + druidServer1 = EasyMock.createMock(DruidServer.class); + druidServer2 = EasyMock.createMock(DruidServer.class); + emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + manager = EasyMock.createMock(DatabaseRuleManager.class); + } + + public void bigProfiler() + { + Stopwatch watch = new Stopwatch(); + int numSegments = 55000; + int numServers=50; + EasyMock.expect(manager.getAllRules()).andReturn(ImmutableMap.>of("test", rules)).anyTimes(); + EasyMock.expect(manager.getRules(EasyMock.anyObject())).andReturn(rules).anyTimes(); + EasyMock.expect(manager.getRulesWithDefault(EasyMock.anyObject())).andReturn(rules).anyTimes(); + EasyMock.replay(manager); + + master.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(master); + + List serverList = Lists.newArrayList(); + Map peonMap = Maps.newHashMap(); + List serverHolderList = Lists.newArrayList(); + Map segmentMap = Maps.newHashMap(); + for (int i=0;inewHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 4L + ) + ); + } + + for (int i=0;i()).anyTimes(); + } + EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(server); + + LoadQueuePeon peon = new LoadQueuePeonTester(); + peonMap.put(Integer.toString(i),peon); + serverHolderList.add(new ServerHolder(server, peon)); + } + + DruidMasterRuntimeParams params = + DruidMasterRuntimeParams.newBuilder() + .withDruidCluster( + new DruidCluster( + ImmutableMap.>of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) + .create( + serverHolderList + ) + ) + ) + ) + .withLoadManagementPeons( + peonMap + ) + .withAvailableSegments(segmentMap.values()) + .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withEmitter(emitter) + .withDatabaseRuleManager(manager) + .withReplicationManager(new ReplicationThrottler(2, 500)) + .withSegmentReplicantLookup( + SegmentReplicantLookup.make(new DruidCluster( + ImmutableMap.>of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) + .create( + serverHolderList + ) + ) + ) + ) + ) + .build(); + + DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master); + DruidMasterRuleRunner runner = new DruidMasterRuleRunner(master,500,5); + watch.start(); + DruidMasterRuntimeParams balanceParams = tester.run(params); + DruidMasterRuntimeParams assignParams = runner.run(params); + System.out.println(watch.stop()); + } + + + public void profileRun(){ + Stopwatch watch = new Stopwatch(); + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); + + EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + + master.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(master); + + DruidMasterRuntimeParams params = + DruidMasterRuntimeParams.newBuilder() + .withDruidCluster( + new DruidCluster( + ImmutableMap.>of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) + .create( + Arrays.asList( + new ServerHolder(druidServer1, fromPeon), + new ServerHolder(druidServer2, toPeon) + ) + ) + ) + ) + ) + .withLoadManagementPeons(ImmutableMap.of("from", fromPeon, "to", toPeon)) + .withAvailableSegments(segments.values()) + .withMasterSegmentSettings(new MasterSegmentSettings.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .build(); + DruidMasterBalancerTester tester = new DruidMasterBalancerTester(master); + watch.start(); + DruidMasterRuntimeParams balanceParams = tester.run(params); + System.out.println(watch.stop()); + } + +}