diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 4a0f2b90bcd..7a387460a9b 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -44,7 +44,7 @@ import java.util.Set; public class BalancerCostAnalyzer { private static final Logger log = new Logger(BalancerCostAnalyzer.class); - private final int MAX_SEGMENTS_TO_MOVE; + private int MAX_SEGMENTS_TO_MOVE; private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; private static final int SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; private static final int THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; @@ -58,20 +58,21 @@ public class BalancerCostAnalyzer private double totalCostChange; private int totalSegments; - public BalancerCostAnalyzer(DateTime referenceTimestamp, int MAX_SEGMENTS_TO_MOVE) + public BalancerCostAnalyzer(DateTime referenceTimestamp) { this.referenceTimestamp = referenceTimestamp; - this.MAX_SEGMENTS_TO_MOVE = MAX_SEGMENTS_TO_MOVE; + rand = new Random(0); totalCostChange = 0; } public void init(List serverHolderList, DruidMasterRuntimeParams params) { + this.serverHolderList = serverHolderList; this.initialTotalCost = calculateInitialTotalCost(serverHolderList); this.normalization = calculateNormalization(serverHolderList); - this.serverHolderList = serverHolderList; this.totalSegments = params.getAvailableSegments().size(); + this.MAX_SEGMENTS_TO_MOVE = params.getMaxSegmentsToMove(); } public double getInitialTotalCost() diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index d5833b203fe..386b85cfbb0 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -575,6 +575,7 @@ public class DruidMaster .withEmitter(emitter) .withMergeBytesLimit(config.getMergeBytesLimit()) .withMergeSegmentsLimit(config.getMergeSegmentsLimit()) + .withMaxSegmentsToMove(config.getMaxSegmentsToMove()) .build(); for (DruidMasterHelper helper : helpers) { @@ -680,7 +681,7 @@ public class DruidMaster }, new DruidMasterRuleRunner(DruidMaster.this), new DruidMasterCleanup(DruidMaster.this), - new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now(), config.getMaxSegmentsToMove())), + new DruidMasterBalancer(DruidMaster.this, new BalancerCostAnalyzer(DateTime.now())), new DruidMasterLogger() ) ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java index a8658937e68..0357baa4d64 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java @@ -49,6 +49,7 @@ public class DruidMasterRuntimeParams private final MasterStats stats; private final long mergeBytesLimit; private final int mergeSegmentsLimit; + private final int maxSegmentsToMove; public DruidMasterRuntimeParams( long startTime, @@ -62,7 +63,8 @@ public class DruidMasterRuntimeParams long millisToWaitBeforeDeleting, MasterStats stats, long mergeBytesLimit, - int mergeSegmentsLimit + int mergeSegmentsLimit, + int maxSegmentsToMove ) { this.startTime = startTime; @@ -77,6 +79,12 @@ public class DruidMasterRuntimeParams this.stats = stats; this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; + this.maxSegmentsToMove = maxSegmentsToMove; + } + + public int getMaxSegmentsToMove() + { + return maxSegmentsToMove; } public long getStartTime() @@ -163,7 +171,8 @@ public class DruidMasterRuntimeParams millisToWaitBeforeDeleting, stats, mergeBytesLimit, - mergeSegmentsLimit + mergeSegmentsLimit, + maxSegmentsToMove ); } @@ -181,6 +190,7 @@ public class DruidMasterRuntimeParams private MasterStats stats; private long mergeBytesLimit; private int mergeSegmentsLimit; + private int maxSegmentsToMove; Builder() { @@ -196,6 +206,7 @@ public class DruidMasterRuntimeParams this.stats = new MasterStats(); this.mergeBytesLimit = 0; this.mergeSegmentsLimit = 0; + this.maxSegmentsToMove = 0; } Builder( @@ -210,7 +221,8 @@ public class DruidMasterRuntimeParams long millisToWaitBeforeDeleting, MasterStats stats, long mergeBytesLimit, - int mergeSegmentsLimit + int mergeSegmentsLimit, + int maxSegmentsToMove ) { this.startTime = startTime; @@ -225,6 +237,7 @@ public class DruidMasterRuntimeParams this.stats = stats; this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; + this.maxSegmentsToMove = maxSegmentsToMove; } public DruidMasterRuntimeParams build() @@ -241,7 +254,8 @@ public class DruidMasterRuntimeParams millisToWaitBeforeDeleting, stats, mergeBytesLimit, - mergeSegmentsLimit + mergeSegmentsLimit, + maxSegmentsToMove ); } @@ -316,5 +330,11 @@ public class DruidMasterRuntimeParams this.mergeSegmentsLimit = mergeSegmentsLimit; return this; } + + public Builder withMaxSegmentsToMove(int maxSegmentsToMove) + { + this.maxSegmentsToMove = maxSegmentsToMove; + return this; + } } } diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index cd63fa69a7e..75cc76d7c7a 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -65,7 +65,7 @@ public abstract class LoadRule implements Rule totalReplicants, serverQueue, segment, - master.getConfig().getMaxSegmentsToMove() + params.getMaxSegmentsToMove() ) ); stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); @@ -87,7 +87,7 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now(), MAX_SEGMENTS_TO_MOVE); + BalancerCostAnalyzer analyzer = new BalancerCostAnalyzer(DateTime.now()); BalancerCostAnalyzer.BalancerCostAnalyzerHelper helper = analyzer.new BalancerCostAnalyzerHelper(serverHolderList, segment); helper.computeAllCosts(); Pair minPair = helper.getCostsServerHolderPairs().pollFirst(); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 551bd424bb4..082c4a78077 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -44,6 +44,7 @@ import java.util.Map; */ public class DruidMasterBalancerTest { + private static final int MAX_SEGMENTS_TO_MOVE = 5; private DruidMaster master; private DruidServer druidServer1; private DruidServer druidServer2; @@ -222,6 +223,7 @@ public class DruidMasterBalancerTest ) .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .withAvailableSegments(segments.values()) + .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .build(); params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); @@ -370,6 +372,7 @@ public class DruidMasterBalancerTest ) .withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon)) .withAvailableSegments(segments.values()) + .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .build(); params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java index 2ac32578cbf..074d61225c1 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java @@ -175,6 +175,7 @@ public class DruidMasterRuleRunnerTest .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withMaxSegmentsToMove(5) .build(); DruidMasterRuntimeParams afterParams = ruleRunner.run(params); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index c54a9a66564..f3f5e0a6d30 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -124,6 +124,12 @@ public class DruidMasterTest { return ""; } + + @Override + public int getMaxSegmentsToMove() + { + return 0; + } }, null, null,