diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 2dae85356df..8cc7ab72148 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -409,7 +409,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { @Override public boolean balanceRSGroup(String groupName) throws IOException { ServerManager serverManager = master.getServerManager(); - AssignmentManager assignmentManager = master.getAssignmentManager(); LoadBalancer balancer = master.getLoadBalancer(); synchronized (balancer) { @@ -447,16 +446,11 @@ public class RSGroupAdminServer implements RSGroupAdmin { plans.addAll(partialPlans); } } - long startTime = System.currentTimeMillis(); boolean balancerRan = !plans.isEmpty(); if (balancerRan) { LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size()); - for (RegionPlan plan: plans) { - LOG.info("balance {}", plan); - assignmentManager.moveAsync(plan); - } - LOG.info("RSGroup balance {} completed after {} seconds", groupName, - (System.currentTimeMillis() - startTime)); + master.executeRegionPlansWithThrottling(plans); + LOG.info("RSGroup balance " + groupName + " completed"); } return balancerRan; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index be01116697f..d450aba60f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1649,7 +1649,6 @@ public class HMaster extends HRegionServer implements MasterServices { return false; } - int maxRegionsInTransition = getMaxRegionsInTransition(); synchronized (this.balancer) { // If balance not true, don't run balancer. if (!this.loadBalancerTracker.isBalancerOn()) return false; @@ -1708,45 +1707,11 @@ public class HMaster extends HRegionServer implements MasterServices { } } - long balanceStartTime = System.currentTimeMillis(); - long cutoffTime = balanceStartTime + this.maxBlancingTime; - int rpCount = 0; // number of RegionPlans balanced so far - if (plans != null && !plans.isEmpty()) { - int balanceInterval = this.maxBlancingTime / plans.size(); - LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is " - + balanceInterval + " ms, and the max number regions in transition is " - + maxRegionsInTransition); - - for (RegionPlan plan: plans) { - LOG.info("balance " + plan); - //TODO: bulk assign - try { - this.assignmentManager.moveAsync(plan); - } catch (HBaseIOException hioe) { - //should ignore failed plans here, avoiding the whole balance plans be aborted - //later calls of balance() can fetch up the failed and skipped plans - LOG.warn("Failed balance plan: {}, just skip it", plan, hioe); - } - //rpCount records balance plans processed, does not care if a plan succeeds - rpCount++; - - balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition, - cutoffTime); - - // if performing next balance exceeds cutoff time, exit the loop - if (rpCount < plans.size() && System.currentTimeMillis() > cutoffTime) { - // TODO: After balance, there should not be a cutoff time (keeping it as - // a security net for now) - LOG.debug("No more balancing till next balance run; maxBalanceTime=" - + this.maxBlancingTime); - break; - } - } - } + List sucRPs = executeRegionPlansWithThrottling(plans); if (this.cpHost != null) { try { - this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans); + this.cpHost.postBalance(sucRPs); } catch (IOException ioe) { // balancing already succeeded so don't change the result LOG.error("Error invoking master coprocessor postBalance()", ioe); @@ -1758,6 +1723,47 @@ public class HMaster extends HRegionServer implements MasterServices { return true; } + public List executeRegionPlansWithThrottling(List plans) { + List sucRPs = new ArrayList<>(); + int maxRegionsInTransition = getMaxRegionsInTransition(); + long balanceStartTime = System.currentTimeMillis(); + long cutoffTime = balanceStartTime + this.maxBlancingTime; + int rpCount = 0; // number of RegionPlans balanced so far + if (plans != null && !plans.isEmpty()) { + int balanceInterval = this.maxBlancingTime / plans.size(); + LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is " + + balanceInterval + " ms, and the max number regions in transition is " + + maxRegionsInTransition); + + for (RegionPlan plan: plans) { + LOG.info("balance " + plan); + //TODO: bulk assign + try { + this.assignmentManager.moveAsync(plan); + } catch (HBaseIOException hioe) { + //should ignore failed plans here, avoiding the whole balance plans be aborted + //later calls of balance() can fetch up the failed and skipped plans + LOG.warn("Failed balance plan: {}, just skip it", plan, hioe); + } + //rpCount records balance plans processed, does not care if a plan succeeds + rpCount++; + + balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition, + cutoffTime); + + // if performing next balance exceeds cutoff time, exit the loop + if (rpCount < plans.size() && System.currentTimeMillis() > cutoffTime) { + // TODO: After balance, there should not be a cutoff time (keeping it as + // a security net for now) + LOG.debug("No more balancing till next balance run; maxBalanceTime=" + + this.maxBlancingTime); + break; + } + } + } + return sucRPs; + } + @Override @VisibleForTesting public RegionNormalizer getRegionNormalizer() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index f9d437a00b2..3524f63eb3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -514,4 +514,12 @@ public interface MasterServices extends Server { * @return the {@link ZKPermissionWatcher} */ ZKPermissionWatcher getZKPermissionWatcher(); + + /** + * Execute region plans with throttling + * @param plans to execute + * @return succeeded plans + */ + List executeRegionPlansWithThrottling(List plans); + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 951eae6994a..6b222010026 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -474,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices { public ZKPermissionWatcher getZKPermissionWatcher() { return null; } -} \ No newline at end of file + + @Override + public List executeRegionPlansWithThrottling(List plans) { + return null; + } +}