HBASE-17178 Add region balance throttling
Signed-off-by: Phil Yang <yangzhe1991@apache.org>
This commit is contained in:
parent
af69783128
commit
f26b3bf5ba
|
@ -128,6 +128,22 @@ public final class HConstants {
|
|||
/** Config for balancing the cluster by table */
|
||||
public static final String HBASE_MASTER_LOADBALANCE_BYTABLE = "hbase.master.loadbalance.bytable";
|
||||
|
||||
/** Config for the max percent of regions in transition */
|
||||
public static final String HBASE_MASTER_BALANCER_MAX_RIT_PERCENT =
|
||||
"hbase.master.balancer.maxRitPercent";
|
||||
|
||||
/** Default value for the max percent of regions in transition */
|
||||
public static final double DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT = 1.0;
|
||||
|
||||
/** Config for the max balancing time */
|
||||
public static final String HBASE_BALANCER_MAX_BALANCING = "hbase.balancer.max.balancing";
|
||||
|
||||
/** Config for the balancer period */
|
||||
public static final String HBASE_BALANCER_PERIOD = "hbase.balancer.period";
|
||||
|
||||
/** Default value for the balancer period */
|
||||
public static final int DEFAULT_HBASE_BALANCER_PERIOD = 300000;
|
||||
|
||||
/** The name of the ensemble table */
|
||||
public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";
|
||||
|
||||
|
|
|
@ -564,6 +564,14 @@ possible configurations would overwhelm and obscure the important.
|
|||
to atomic bulk loads are attempted in the face of splitting operations
|
||||
0 means never give up.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.master.balancer.maxRitPercent</name>
|
||||
<value>1.0</value>
|
||||
<description>The max percent of regions in transition when balancing.
|
||||
The default value is 1.0. So there are no balancer throttling. If set this config to 0.01,
|
||||
It means that there are at most 1% regions in transition when balancing.
|
||||
Then the cluster's availability is at least 99% when balancing.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.balancer.period
|
||||
</name>
|
||||
|
|
|
@ -309,6 +309,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
private final ProcedureEvent serverCrashProcessingEnabled =
|
||||
new ProcedureEvent("server crash processing");
|
||||
|
||||
// Maximum time we should run balancer for
|
||||
private final int maxBlancingTime;
|
||||
// Maximum percent of regions in transition when balancing
|
||||
private final double maxRitPercent;
|
||||
|
||||
LoadBalancer balancer;
|
||||
private RegionNormalizer normalizer;
|
||||
private BalancerChore balancerChore;
|
||||
|
@ -432,6 +437,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
// preload table descriptor at startup
|
||||
this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
|
||||
|
||||
this.maxBlancingTime = getMaxBalancingTime();
|
||||
this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
|
||||
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
|
||||
|
||||
// Do we publish the status?
|
||||
|
||||
boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
|
||||
|
@ -1298,18 +1307,61 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
/**
|
||||
* @return Maximum time we should run balancer for
|
||||
*/
|
||||
private int getBalancerCutoffTime() {
|
||||
int balancerCutoffTime =
|
||||
getConfiguration().getInt("hbase.balancer.max.balancing", -1);
|
||||
if (balancerCutoffTime == -1) {
|
||||
private int getMaxBalancingTime() {
|
||||
int maxBalancingTime = getConfiguration().
|
||||
getInt(HConstants.HBASE_BALANCER_MAX_BALANCING, -1);
|
||||
if (maxBalancingTime == -1) {
|
||||
// No time period set so create one
|
||||
int balancerPeriod =
|
||||
getConfiguration().getInt("hbase.balancer.period", 300000);
|
||||
balancerCutoffTime = balancerPeriod;
|
||||
// If nonsense period, set it to balancerPeriod
|
||||
if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
|
||||
maxBalancingTime = getConfiguration().getInt(HConstants.HBASE_BALANCER_PERIOD,
|
||||
HConstants.DEFAULT_HBASE_BALANCER_PERIOD);
|
||||
}
|
||||
return balancerCutoffTime;
|
||||
return maxBalancingTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Maximum number of regions in transition
|
||||
*/
|
||||
private int getMaxRegionsInTransition() {
|
||||
int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
|
||||
return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* It first sleep to the next balance plan start time. Meanwhile, throttling by the max
|
||||
* number regions in transition to protect availability.
|
||||
* @param nextBalanceStartTime The next balance plan start time
|
||||
* @param maxRegionsInTransition max number of regions in transition
|
||||
* @param cutoffTime when to exit balancer
|
||||
*/
|
||||
private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
|
||||
long cutoffTime) {
|
||||
boolean interrupted = false;
|
||||
|
||||
// Sleep to next balance plan start time
|
||||
// But if there are zero regions in transition, it can skip sleep to speed up.
|
||||
while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
|
||||
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount() != 0) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Throttling by max number regions in transition
|
||||
while (!interrupted
|
||||
&& maxRegionsInTransition > 0
|
||||
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
|
||||
>= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
|
||||
try {
|
||||
// sleep if the number of regions in transition exceeds the limit
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (interrupted) Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
public boolean balance() throws IOException {
|
||||
|
@ -1328,8 +1380,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Do this call outside of synchronized block.
|
||||
int maximumBalanceTime = getBalancerCutoffTime();
|
||||
int maxRegionsInTransition = getMaxRegionsInTransition();
|
||||
synchronized (this.balancer) {
|
||||
// If balance not true, don't run balancer.
|
||||
if (!this.loadBalancerTracker.isBalancerOn()) return false;
|
||||
|
@ -1376,27 +1427,35 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
if (partialPlans != null) plans.addAll(partialPlans);
|
||||
}
|
||||
|
||||
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
|
||||
long balanceStartTime = System.currentTimeMillis();
|
||||
long cutoffTime = balanceStartTime + this.maxBlancingTime;
|
||||
int rpCount = 0; // number of RegionPlans balanced so far
|
||||
long totalRegPlanExecTime = 0;
|
||||
if (plans != null && !plans.isEmpty()) {
|
||||
int balanceInterval = this.maxBlancingTime / plans.size();
|
||||
LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
|
||||
+ balanceInterval + " ms, and the max number regions in transition is "
|
||||
+ maxRegionsInTransition);
|
||||
|
||||
for (RegionPlan plan: plans) {
|
||||
LOG.info("balance " + plan);
|
||||
long balStartTime = System.currentTimeMillis();
|
||||
//TODO: bulk assign
|
||||
this.assignmentManager.balance(plan);
|
||||
totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
|
||||
rpCount++;
|
||||
if (rpCount < plans.size() &&
|
||||
// if performing next balance exceeds cutoff time, exit the loop
|
||||
(System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
|
||||
//TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
|
||||
LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
|
||||
maximumBalanceTime);
|
||||
|
||||
balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
|
||||
cutoffTime);
|
||||
|
||||
// if performing next balance exceeds cutoff time, exit the loop
|
||||
if (rpCount < plans.size() && System.currentTimeMillis() > cutoffTime) {
|
||||
// TODO: After balance, there should not be a cutoff time (keeping it as a security net
|
||||
// for now)
|
||||
LOG.debug("No more balancing till next balance run; maxBalanceTime="
|
||||
+ this.maxBlancingTime);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.cpHost != null) {
|
||||
try {
|
||||
this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
|
||||
|
|
|
@ -240,6 +240,13 @@ public class RegionStates {
|
|||
return rit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of regions in transition.
|
||||
*/
|
||||
public synchronized int getRegionsInTransitionCount() {
|
||||
return regionsInTransition.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if specified region in transition.
|
||||
*/
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
|
@ -38,7 +39,7 @@ public class BalancerChore extends ScheduledChore {
|
|||
|
||||
public BalancerChore(HMaster master) {
|
||||
super(master.getServerName() + "-BalancerChore", master, master.getConfiguration().getInt(
|
||||
"hbase.balancer.period", 300000));
|
||||
HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
|
||||
this.master = master;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestMasterBalanceThrottling {
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
|
||||
|
||||
@Before
|
||||
public void setupConfiguration() {
|
||||
TEST_UTIL.getConfiguration().set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
||||
"org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer");
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_MAX_BALANCING,
|
||||
HConstants.DEFAULT_HBASE_BALANCER_PERIOD);
|
||||
TEST_UTIL.getConfiguration().setDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
|
||||
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testThrottlingByBalanceInterval() throws Exception {
|
||||
// Use default config and start a cluster of two regionservers.
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
|
||||
TableName tableName = createTable("testNoThrottling");
|
||||
final HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
|
||||
// Default max balancing time is 300000 ms and there are 50 regions to balance
|
||||
// The balance interval is 6000 ms, much longger than the normal region in transition duration
|
||||
// So the master can balance the region one by one
|
||||
unbalance(master, tableName);
|
||||
AtomicInteger maxCount = new AtomicInteger(0);
|
||||
AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread checker = startBalancerChecker(master, maxCount, stop);
|
||||
master.balance();
|
||||
stop.set(true);
|
||||
checker.interrupt();
|
||||
checker.join();
|
||||
assertTrue("max regions in transition: " + maxCount.get(), maxCount.get() == 1);
|
||||
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testThrottlingByMaxRitPercent() throws Exception {
|
||||
// Set max balancing time to 500 ms and max percent of regions in transition to 0.05
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_MAX_BALANCING, 500);
|
||||
TEST_UTIL.getConfiguration().setDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT, 0.05);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
|
||||
TableName tableName = createTable("testThrottlingByMaxRitPercent");
|
||||
final HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
|
||||
unbalance(master, tableName);
|
||||
AtomicInteger maxCount = new AtomicInteger(0);
|
||||
AtomicBoolean stop = new AtomicBoolean(false);
|
||||
Thread checker = startBalancerChecker(master, maxCount, stop);
|
||||
master.balance();
|
||||
stop.set(true);
|
||||
checker.interrupt();
|
||||
checker.join();
|
||||
// The max number of regions in transition is 100 * 0.05 = 5
|
||||
assertTrue("max regions in transition: " + maxCount.get(), maxCount.get() == 5);
|
||||
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
|
||||
private TableName createTable(String table) throws IOException {
|
||||
TableName tableName = TableName.valueOf(table);
|
||||
byte[] startKey = new byte[] { 0x00 };
|
||||
byte[] stopKey = new byte[] { 0x7f };
|
||||
TEST_UTIL.createTable(tableName, new byte[][] { FAMILYNAME }, 1, startKey, stopKey,
|
||||
100);
|
||||
return tableName;
|
||||
}
|
||||
|
||||
private Thread startBalancerChecker(final HMaster master, final AtomicInteger maxCount,
|
||||
final AtomicBoolean stop) {
|
||||
Runnable checker = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!stop.get()) {
|
||||
maxCount.set(Math.max(maxCount.get(), master.getAssignmentManager().getRegionStates()
|
||||
.getRegionsInTransitionCount()));
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Thread thread = new Thread(checker);
|
||||
thread.start();
|
||||
return thread;
|
||||
}
|
||||
|
||||
private void unbalance(HMaster master, TableName tableName) throws Exception {
|
||||
while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
HRegionServer biasedServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||
for (HRegionInfo regionInfo : TEST_UTIL.getHBaseAdmin().getTableRegions(tableName)) {
|
||||
master.move(regionInfo.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(biasedServer.getServerName().getServerName()));
|
||||
}
|
||||
while (master.getAssignmentManager().getRegionStates().getRegionsInTransitionCount() > 0) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -137,7 +137,7 @@ public class TestSplitTransactionOnCluster {
|
|||
new HBaseTestingUtility();
|
||||
|
||||
static void setupOnce() throws Exception {
|
||||
TESTING_UTIL.getConfiguration().setInt("hbase.balancer.period", 60000);
|
||||
TESTING_UTIL.getConfiguration().setInt(HConstants.HBASE_BALANCER_PERIOD, 60000);
|
||||
useZKForAssignment = TESTING_UTIL.getConfiguration().getBoolean(
|
||||
"hbase.assignment.usezk", true);
|
||||
TESTING_UTIL.startMiniCluster(NB_SERVERS);
|
||||
|
|
Loading…
Reference in New Issue