diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 386b85cfbb0..b9748ac7f1c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -676,12 +676,13 @@ public class DruidMaster .withDruidCluster(cluster) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(DateTime.now())) .build(); } }, new DruidMasterRuleRunner(DruidMaster.this), new DruidMasterCleanup(DruidMaster.this), - new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())), + new DruidMasterBalancer(DruidMaster.this), new DruidMasterLogger() ) ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index ab0aba86943..ac1ae7283c4 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -51,18 +51,15 @@ public class DruidMasterBalancer implements DruidMasterHelper ); private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class); - private final BalancerCostAnalyzer analyzer; private final DruidMaster master; private final Map> currentlyMovingSegments = Maps.newHashMap(); public DruidMasterBalancer( - DruidMaster master, - BalancerCostAnalyzer analyzer + DruidMaster master ) { this.master = master; - this.analyzer = analyzer; } private void reduceLifetimes(String tier) @@ -106,6 +103,7 @@ public class DruidMasterBalancer implements DruidMasterHelper List serverHolderList = new ArrayList(entry.getValue()); + BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); analyzer.init(serverHolderList, params); moveSegments(analyzer.findSegmentsToMove(), params); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java index 0357baa4d64..ed2843d9faa 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java @@ -50,6 +50,7 @@ public class DruidMasterRuntimeParams private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; + private final BalancerCostAnalyzer balancerCostAnalyzer; public DruidMasterRuntimeParams( long startTime, @@ -64,7 +65,8 @@ public class DruidMasterRuntimeParams MasterStats stats, long mergeBytesLimit, int mergeSegmentsLimit, - int maxSegmentsToMove + int maxSegmentsToMove, + BalancerCostAnalyzer balancerCostAnalyzer ) { this.startTime = startTime; @@ -80,11 +82,7 @@ public class DruidMasterRuntimeParams this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; - } - - public int getMaxSegmentsToMove() - { - return maxSegmentsToMove; + this.balancerCostAnalyzer = balancerCostAnalyzer; } public long getStartTime() @@ -147,6 +145,16 @@ public class DruidMasterRuntimeParams return mergeSegmentsLimit; } + public int getMaxSegmentsToMove() + { + return maxSegmentsToMove; + } + + public BalancerCostAnalyzer getBalancerCostAnalyzer() + { + return balancerCostAnalyzer; + } + public boolean hasDeletionWaitTimeElapsed() { return (System.currentTimeMillis() - getStartTime() > getMillisToWaitBeforeDeleting()); @@ -172,7 +180,8 @@ public class DruidMasterRuntimeParams stats, mergeBytesLimit, mergeSegmentsLimit, - maxSegmentsToMove + maxSegmentsToMove, + balancerCostAnalyzer ); } @@ -191,6 +200,7 @@ public class DruidMasterRuntimeParams private long mergeBytesLimit; private int mergeSegmentsLimit; private int maxSegmentsToMove; + private BalancerCostAnalyzer balancerCostAnalyzer; Builder() { @@ -207,6 +217,7 @@ public class DruidMasterRuntimeParams this.mergeBytesLimit = 0; this.mergeSegmentsLimit = 0; this.maxSegmentsToMove = 0; + this.balancerCostAnalyzer = null; } Builder( @@ -222,7 +233,8 @@ public class DruidMasterRuntimeParams MasterStats stats, long mergeBytesLimit, int mergeSegmentsLimit, - int maxSegmentsToMove + int maxSegmentsToMove, + BalancerCostAnalyzer balancerCostAnalyzer ) { this.startTime = startTime; @@ -238,6 +250,7 @@ public class DruidMasterRuntimeParams this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + this.balancerCostAnalyzer = balancerCostAnalyzer; } public DruidMasterRuntimeParams build() @@ -255,7 +268,8 @@ public class DruidMasterRuntimeParams stats, mergeBytesLimit, mergeSegmentsLimit, - maxSegmentsToMove + maxSegmentsToMove, + balancerCostAnalyzer ); } @@ -336,5 +350,11 @@ public class DruidMasterRuntimeParams this.maxSegmentsToMove = maxSegmentsToMove; return this; } + + public Builder withBalancerCostAnalyzer(BalancerCostAnalyzer balancerCostAnalyzer) + { + this.balancerCostAnalyzer = balancerCostAnalyzer; + return this; + } } } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index ba4b065bacb..c18d7d8db87 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -31,7 +31,6 @@ import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.MasterStats; import com.metamx.druid.master.ServerHolder; import com.metamx.emitter.EmittingLogger; -import org.joda.time.DateTime; import java.util.ArrayList; import java.util.List; @@ -65,7 +64,7 @@ public abstract class LoadRule implements Rule totalReplicants, serverQueue, segment, - params.getMaxSegmentsToMove() + params ) ); stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); @@ -78,7 +77,7 @@ public abstract class LoadRule implements Rule int totalReplicants, MinMaxPriorityQueue serverQueue, DataSegment segment, - int MAX_SEGMENTS_TO_MOVE + DruidMasterRuntimeParams params ) { MasterStats stats = new MasterStats(); @@ -87,7 +86,7 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now()); + BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(); BalancerCostAnalyzer.BalancerCostComputer helper = analyzer.new BalancerCostComputer(serverHolderList, segment); Pair minPair = helper.getMinPair(); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 082c4a78077..4e677acbae1 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -224,9 +224,10 @@ public class DruidMasterBalancerTest .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .withAvailableSegments(segments.values()) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); - params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); + params = new DruidMasterBalancer(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); } @@ -373,9 +374,10 @@ public class DruidMasterBalancerTest .withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon)) .withAvailableSegments(segments.values()) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); - params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); + params = new DruidMasterBalancer(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java index 074d61225c1..17e4ef3bd0b 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java @@ -176,6 +176,7 @@ public class DruidMasterRuleRunnerTest .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withMaxSegmentsToMove(5) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -265,6 +266,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -350,6 +352,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -411,6 +414,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); @@ -675,6 +679,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); @@ -751,6 +756,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerCostAnalyzer(new BalancerCostAnalyzer(new DateTime("2013-01-01"))) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params);