HBASE-25768 Support an overall coarse and fast balance strategy for StochasticLoadBalancer

This commit is contained in:
haxiaolin 2022-05-06 18:39:48 +08:00
parent 002c92cd7a
commit 528cc1e29e
8 changed files with 519 additions and 88 deletions

View File

@ -601,7 +601,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
* @see #balanceTable(TableName, Map)
*/
@Override
public final List<RegionPlan>
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
preBalanceCluster(loadOfAllTable);
if (isByTable) {

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The cluster status checker for {@link StochasticLoadBalancer}, if the skew table counts up to a
* configured ratio, the simplified balancer will be executed, it is aimed to balance the regions as
* soon as possible. For example, when there are large-scaled restart of RSes, or expansion for
* groups or cluster, we hope the balancer can execute as soon as possible, but the
* StochasticLoadBalancer may need a lot of time to compute costs.
*/
@InterfaceAudience.Private
class OverallSkewChecker implements StochasticLoadBalancer.ClusterStatusBalanceChecker {
private static final Logger LOG = LoggerFactory.getLogger(OverallSkewChecker.class);
private static final String BALANCER_OVERALL_SKEW_PERCENT_KEY =
"hbase.master.balancer.overall.skew.percentage";
private Configuration conf;
public OverallSkewChecker(Configuration conf) {
this.conf = conf;
}
@Override
public boolean
needsCoarseBalance(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
if (this.conf == null) {
LOG.warn("Configuration should not be null");
return false;
}
if (isClusterSkew(loadOfAllTable)) {
LOG.info("Balancer checker found cluster is skew, will perform force balance, "
+ "table count is {}", loadOfAllTable.size());
return true;
}
LOG.debug("Balancer checked cluster, it is not skew now, tables count is {}",
loadOfAllTable.size());
return false;
}
public boolean isClusterSkew(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
if (loadOfAllTable == null || loadOfAllTable.isEmpty()) {
return false;
}
int skewCount = 0;
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : loadOfAllTable
.entrySet()) {
if (isSkew(entry.getValue())) {
LOG.info("Table: " + entry.getKey().getNameAsString() + " regions count is skew");
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(
"Table: " + entry.getKey().getNameAsString() + " regions distribution is: ");
for (Map.Entry<ServerName, List<RegionInfo>> regionDistribution : entry.getValue()
.entrySet()) {
sb.append(regionDistribution.getKey().getServerName()).append(" count=")
.append(regionDistribution.getValue().size()).append(":");
sb.append(regionDistribution.getValue());
}
LOG.debug(sb.toString());
}
skewCount++;
}
}
LOG.info("Cluster is skew, skew table count={}, load table count={}", skewCount,
loadOfAllTable.size());
return skewCount > 0 && (double) skewCount / loadOfAllTable.size()
> conf.getDouble(BALANCER_OVERALL_SKEW_PERCENT_KEY, 0.5);
}
private boolean isSkew(Map<ServerName, List<RegionInfo>> regions) {
if (regions == null || regions.isEmpty()) {
return false;
}
int max = 0;
int min = Integer.MAX_VALUE;
for (Map.Entry<ServerName, List<RegionInfo>> entry : regions.entrySet()) {
int count = entry.getValue() == null ? 0 : entry.getValue().size();
max = Math.max(max, count);
min = Math.min(min, count);
}
return max - min > 2;
}
}

View File

@ -18,16 +18,18 @@
package org.apache.hadoop.hbase.master.balancer;
import com.google.errorprone.annotations.RestrictedApi;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -47,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* <p>
* This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
@ -127,6 +131,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"hbase.master.balancer.stochastic.additionalCostFunctions";
public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
/** Configuration for classes of cluster status checkers */
protected static final String CLUSTER_BALANCE_STATUS_CHECKERS_KEY =
"hbase.master.balancer.stochastic.cluster.status.checkers";
/**
* Configuration for the minimum interval for coarse balance. Coarse balance is designed to happen
* on strict conditions, which should not occur often. And this can also limit the frequency of
* switching between coarse and accurate balance.
*/
protected static final String CLUSTER_COARSE_BALANCE_MIN_INTERVAL_KEY =
"hbase.master.balance.stochastic.cluster.coarse.balance.min.interval.ms";
private static final long CLUSTER_COARSE_BALANCE_MIN_INTERVAL_DEFAULT = 4 * 60 * 60 * 1000; // 4h
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
// values are defaults
@ -138,6 +155,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
private List<CostFunction> coarseCostFunctions;
// To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
private float sumMultiplier;
// to save and report costs to JMX
@ -156,6 +174,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected List<CandidateGenerator> candidateGenerators;
protected List<ClusterStatusBalanceChecker> clusterStatusCheckers;
protected long lastCoarseTimestamp = 0;
public enum GeneratorType {
RANDOM,
LOAD,
@ -177,35 +198,34 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
super(metricsStochasticBalancer);
}
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
Configuration conf) {
try {
Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
} catch (NoSuchMethodException e) {
// will try construct with no parameter
}
return ReflectionUtils.newInstance(clazz);
}
private <T> List<? extends T> loadFunctions(Configuration conf, String configName) {
String[] classNames = conf.getStrings(configName);
private void loadCustomCostFunctions(Configuration conf) {
String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
if (null == functionsNames) {
return;
if (null == classNames) {
return new ArrayList<>();
}
for (String className : functionsNames) {
Class<? extends CostFunction> clazz;
return Arrays.stream(classNames).map(c -> {
Class<? extends T> klass = null;
try {
clazz = Class.forName(className).asSubclass(CostFunction.class);
klass = (Class<? extends T>) Class.forName(c);
} catch (ClassNotFoundException e) {
LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
continue;
LOG.warn("Cannot load class " + c + "': " + e.getMessage());
}
CostFunction func = createCostFunction(clazz, conf);
LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
costFunctions.add(func);
}
if (null == klass) {
return null;
}
T reflected;
try {
reflected = ReflectionUtils.newInstance(klass, conf);
} catch (UnsupportedOperationException e) {
// will try construct with no parameter
reflected = ReflectionUtils.newInstance(klass);
}
LOG.info("Successfully loaded class {}", reflected.getClass().getSimpleName());
return reflected;
}).filter(Objects::nonNull).collect(Collectors.toList());
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
@ -256,14 +276,41 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
addCostFunction(new WriteRequestCostFunction(conf));
addCostFunction(new MemStoreSizeCostFunction(conf));
addCostFunction(new StoreFileCostFunction(conf));
loadCustomCostFunctions(conf);
List<CostFunction> additionals =
(List<CostFunction>) loadFunctions(conf, COST_FUNCTIONS_COST_FUNCTIONS_KEY);
if (!CollectionUtils.isEmpty(additionals)) {
for (CostFunction costFunction : additionals) {
addCostFunction(costFunction);
}
}
coarseCostFunctions = new ArrayList<>();
coarseCostFunctions.add(new RegionCountSkewCostFunction(conf));
coarseCostFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
coarseCostFunctions.add(new MoveCostFunction(conf, provider));
coarseCostFunctions.add(regionReplicaHostCostFunction);
coarseCostFunctions.add(regionReplicaRackCostFunction);
coarseCostFunctions.add(new TableSkewCostFunction(conf));
assert coarseCostFunctions.size() <= costFunctions.size()
: "Simple cost functions should be less than normal!";
curFunctionCosts = new double[costFunctions.size()];
tempFunctionCosts = new double[costFunctions.size()];
clusterStatusCheckers = new LinkedList<>();
List<ClusterStatusBalanceChecker> statusCheckers =
(List<ClusterStatusBalanceChecker>) loadFunctions(conf, CLUSTER_BALANCE_STATUS_CHECKERS_KEY);
if (!CollectionUtils.isEmpty(statusCheckers)) {
clusterStatusCheckers.addAll(statusCheckers);
}
LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
+ ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
+ isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
+ isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames(false))
+ ", simpleCostFunctions=" + Arrays.toString(getCostFunctionNames(true))
+ ", clusterStatusCheckers="
+ Arrays.toString(conf.getStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY))
+ " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
}
@ -276,7 +323,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
try {
// by-table or ensemble mode
int tablesCount = isByTable ? provider.getNumberOfTables() : 1;
int functionsCount = getCostFunctionNames().length;
int functionsCount = getCostFunctionNames(false).length;
updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
} catch (Exception e) {
@ -284,6 +331,60 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
@Override
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
long current = EnvironmentEdgeManager.currentTime();
if (
clusterStatusCheckers != null && current - lastCoarseTimestamp > getConf().getLong(
CLUSTER_COARSE_BALANCE_MIN_INTERVAL_KEY, CLUSTER_COARSE_BALANCE_MIN_INTERVAL_DEFAULT)
) {
for (ClusterStatusBalanceChecker clusterStatusBalancer : clusterStatusCheckers) {
boolean coarseBalance = clusterStatusBalancer.needsCoarseBalance(loadOfAllTable);
if (coarseBalance) {
// no matter whether the coarse balance generates plans, we record the
// lastCoarseTimestamp as the timestamp of calculating coarse plans
lastCoarseTimestamp = current;
List<RegionPlan> plans = coarseBalanceCluster(loadOfAllTable);
if (plans != null && !plans.isEmpty()) {
LOG.debug("Cluster needs coarse balance, and plans size={}", plans.size());
return plans;
} else {
LOG.warn("Cluster needs coarse balance, but generated no plans by current "
+ "coarseCostFunctions, please see more details in HBASE-25768");
break;
}
}
}
}
return super.balanceCluster(loadOfAllTable);
}
/**
* Use the coarse cost functions to balance cluster.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
List<RegionPlan>
coarseBalanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
preBalanceCluster(loadOfAllTable);
if (isByTable) {
List<RegionPlan> result = new ArrayList<>();
loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
LOG.info("Coarse balance start Generate Balance plan for table: " + tableName);
List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable, true);
if (partialPlans != null) {
result.addAll(partialPlans);
}
});
return result;
} else {
LOG.info("Coarse balance start Generate Balance plans for cluster");
return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable),
true);
}
}
private void updateBalancerTableLoadInfo(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
RegionHDFSBlockLocationFinder finder = null;
@ -293,10 +394,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
BalancerClusterState cluster =
new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
initCosts(cluster);
curOverallCost = computeCost(cluster, Double.MAX_VALUE);
initCosts(cluster, false);
curOverallCost = computeCost(cluster, Double.MAX_VALUE, false);
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, false);
}
@Override
@ -342,8 +443,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer|Checker).java")
boolean needsBalance(TableName tableName, BalancerClusterState cluster, boolean coarse) {
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
LOG.info(
@ -354,24 +455,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
if (areSomeRegionReplicasColocated(cluster)) {
LOG.info("Running balancer because at least one server hosts replicas of the same region."
+ " function cost={}", functionCost());
+ " function cost={}", functionCost(coarse));
return true;
}
if (idleRegionServerExist(cluster)) {
LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
functionCost());
functionCost(coarse));
return true;
}
if (sloppyRegionServerExist(cs)) {
LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
functionCost());
functionCost(coarse));
return true;
}
double total = 0.0;
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
if (!c.isNeeded()) {
LOG.trace("{} not needed", c.getClass().getSimpleName());
continue;
@ -383,14 +484,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
if (balanced) {
final double calculatedTotal = total;
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier),
costFunctions);
coarse ? coarseCostFunctions : costFunctions);
LOG.info(
"{} - skipping load balancing because weighted average imbalance={} <= "
+ "threshold({}). If you want more aggressive balancing, either lower "
+ "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
+ "multiplier(s) of the specific cost function(s). functionCost={}",
isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
minCostNeedBalance, minCostNeedBalance, functionCost());
minCostNeedBalance, minCostNeedBalance, functionCost(coarse));
} else {
LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime);
@ -447,6 +548,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
return balanceTable(tableName, loadOfOneTable, false);
}
private List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable, boolean coarse) {
// On clusters with lots of HFileLinks or lots of reference files,
// instantiating the storefile infos can be quite expensive.
// Allow turning this feature off if the locality cost is not going to
@ -464,10 +570,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
long startTime = EnvironmentEdgeManager.currentTime();
initCosts(cluster);
initCosts(cluster, coarse);
sumMultiplier = 0;
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
if (c.isNeeded()) {
sumMultiplier += c.getMultiplier();
}
@ -478,14 +584,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double currentCost = computeCost(cluster, Double.MAX_VALUE, coarse);
curOverallCost = currentCost;
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, coarse);
double initCost = currentCost;
double newCost;
if (!needsBalance(tableName, cluster)) {
if (!needsBalance(tableName, cluster, coarse)) {
return null;
}
@ -507,9 +613,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
LOG.info(
"Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
+ "functionCost={} computedMaxSteps={}",
currentCost / sumMultiplier, functionCost(), computedMaxSteps);
currentCost / sumMultiplier, functionCost(coarse), computedMaxSteps);
final String initFunctionTotalCosts = totalCostsPerFunc();
final String initFunctionTotalCosts = totalCostsPerFunc(coarse);
// Perform a stochastic walk to see if we can get a good fit.
long step;
@ -521,9 +627,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
cluster.doAction(action);
updateCostsAndWeightsWithAction(cluster, action);
updateCostsAndWeightsWithAction(cluster, action, coarse);
newCost = computeCost(cluster, currentCost);
newCost = computeCost(cluster, currentCost, coarse);
// Should this be kept?
if (newCost < currentCost) {
@ -537,7 +643,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// TODO: undo by remembering old values
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
updateCostsAndWeightsWithAction(cluster, undoAction);
updateCostsAndWeightsWithAction(cluster, undoAction, coarse);
}
if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
@ -549,16 +655,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
metricsBalancer.balanceCluster(endTime - startTime);
if (initCost > currentCost) {
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, coarse);
List<RegionPlan> plans = createRegionPlans(cluster);
LOG.info(
"Finished computing new moving plan. Computation took {} ms"
+ " to try {} different iterations. Found a solution that moves "
+ "{} regions; Going from a computed imbalance of {}"
+ " to a new imbalance of {}. funtionCost={}",
endTime - startTime, step, plans.size(), initCost / sumMultiplier,
currentCost / sumMultiplier, functionCost());
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
currentCost / sumMultiplier, functionCost(coarse));
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step,
coarse);
return plans;
}
LOG.info(
@ -585,7 +693,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
double initCost, String initFunctionTotalCosts, long step, boolean coarse) {
provider.recordBalancerDecision(() -> {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
@ -595,7 +703,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
return new BalancerDecision.Builder().setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
.setFinalFunctionCosts(totalCostsPerFunc(coarse)).setComputedSteps(step)
.setRegionPlans(regionPlans).build();
});
}
@ -603,7 +711,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
/**
* update costs to JMX
*/
private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts,
boolean coarse) {
if (tableName == null) {
return;
}
@ -616,8 +725,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"Overall cost", overall);
// each cost function
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction costFunction = costFunctions.get(i);
List<CostFunction> realCostFunctions = coarse ? coarseCostFunctions : costFunctions;
for (int i = 0; i < realCostFunctions.size(); i++) {
CostFunction costFunction = realCostFunctions.get(i);
String costFunctionName = costFunction.getClass().getSimpleName();
double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
// TODO: cost function may need a specific description
@ -634,9 +744,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
private String functionCost() {
private String functionCost(boolean coarse) {
StringBuilder builder = new StringBuilder();
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
builder.append(c.getClass().getSimpleName());
builder.append(" : (");
if (c.isNeeded()) {
@ -655,9 +765,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return builder.toString();
}
private String totalCostsPerFunc() {
private String totalCostsPerFunc(boolean coarse) {
StringBuilder builder = new StringBuilder();
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
if (!c.isNeeded()) {
continue;
}
@ -731,10 +841,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
void initCosts(BalancerClusterState cluster, boolean coarse) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
c.prepare(cluster);
c.updateWeight(weightsOfGenerators);
}
@ -745,12 +855,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action,
boolean coarse) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
}
for (CostFunction c : costFunctions) {
for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) {
if (c.isNeeded()) {
c.postAction(action);
c.updateWeight(weightsOfGenerators);
@ -763,10 +874,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
String[] getCostFunctionNames() {
String[] ret = new String[costFunctions.size()];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
String[] getCostFunctionNames(boolean coarse) {
List<CostFunction> functions = coarse ? coarseCostFunctions : costFunctions;
String[] ret = new String[functions.size()];
for (int i = 0; i < functions.size(); i++) {
CostFunction c = functions.get(i);
ret[i] = c.getClass().getSimpleName();
}
@ -783,12 +895,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
double computeCost(BalancerClusterState cluster, double previousCost) {
double computeCost(BalancerClusterState cluster, double previousCost, boolean coarse) {
double total = 0;
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
this.tempFunctionCosts[i] = 0.0;
List<CostFunction> toComputeCostFunctions = coarse ? coarseCostFunctions : costFunctions;
Arrays.fill(this.tempFunctionCosts, 0.0);
for (int i = 0; i < toComputeCostFunctions.size(); i++) {
CostFunction c = toComputeCostFunctions.get(i);
if (!c.isNeeded()) {
continue;
@ -814,4 +927,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}
/**
* The interface for overall cluster status checkers, used in {@code StochasticLoadBalancer}.
*/
public interface ClusterStatusBalanceChecker {
/**
* Check if the cluster needs simplified balance to ack quickly, ignoring the complex and
* time-consuming calculations in normal steps of {@code StochasticLoadBalancer}.
*/
boolean needsCoarseBalance(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable);
}
}

View File

@ -23,4 +23,9 @@ public class DummyCostFunction extends CostFunction {
protected double cost() {
return 0;
}
@Override
float getMultiplier() {
return 1;
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, SmallTests.class })
public class TestOverallSkewChecker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestOverallSkewChecker.class);
private final ServerName sn1 = ServerName.valueOf("host1,5000,1111");
private final ServerName sn2 = ServerName.valueOf("host2,5000,2222");
private final ServerName sn3 = ServerName.valueOf("host3,5000,3333");
private final TableName t1 = TableName.valueOf("table1");
private final TableName t2 = TableName.valueOf("table2");
private final TableName t3 = TableName.valueOf("table3");
@Test
public void testCheckSkew() {
OverallSkewChecker overallSkewChecker = new OverallSkewChecker(HBaseConfiguration.create());
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
Map<TableName, Map<ServerName, List<RegionInfo>>> tableLoads =
mockTwoTableSkewClusterLoad(loads);
assertTrue(overallSkewChecker.needsCoarseBalance(tableLoads));
}
@Test
public void testCheckNotSkew() {
OverallSkewChecker overallSkewChecker = new OverallSkewChecker(HBaseConfiguration.create());
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
Map<TableName, Map<ServerName, List<RegionInfo>>> tableLoads =
mockOneTableSkewClusterLoad(loads);
assertFalse(overallSkewChecker.needsCoarseBalance(tableLoads));
}
private Map<TableName, Map<ServerName, List<RegionInfo>>>
mockOneTableSkewClusterLoad(Map<String, Deque<BalancerRegionLoad>> loads) {
Map<TableName, Map<ServerName, List<RegionInfo>>> allTableLoads = new HashMap<>();
allTableLoads.put(t1, mockServerRegions(t1, 10, 5, 3, loads, 80000L));
allTableLoads.put(t2, mockServerRegions(t2, 5, 5, 5, loads, 500L));
allTableLoads.put(t3, mockServerRegions(t3, 3, 2, 1, loads, 100L));
return allTableLoads;
}
private Map<TableName, Map<ServerName, List<RegionInfo>>>
mockTwoTableSkewClusterLoad(Map<String, Deque<BalancerRegionLoad>> loads) {
Map<TableName, Map<ServerName, List<RegionInfo>>> allTableLoads = new HashMap<>();
allTableLoads.put(t1, mockServerRegions(t1, 10, 5, 3, loads, 80000L));
allTableLoads.put(t2, mockServerRegions(t2, 5, 1, 2, loads, 500L));
allTableLoads.put(t3, mockServerRegions(t3, 3, 2, 1, loads, 100L));
return allTableLoads;
}
protected List<RegionInfo> mockTableRegions(TableName tableName, int count,
Map<String, Deque<BalancerRegionLoad>> loads, long baseLoad) {
return mockTableRegions(tableName, count, loads, baseLoad, null);
}
protected List<RegionInfo> mockTableRegions(TableName tableName, int count,
Map<String, Deque<BalancerRegionLoad>> loads, long baseLoad, String baseRegionName) {
List<RegionInfo> regions = new ArrayList<>();
for (int i = 0; i < count; i++) {
RegionInfo mockRegion = mockRegionInfo(
tableName.getNameAsString() + (baseRegionName == null ? "_region_" : baseRegionName) + i,
tableName);
regions.add(mockRegion);
Deque<BalancerRegionLoad> BalancerRegionLoads =
loads.computeIfAbsent(mockRegion.getEncodedName(), k -> new ArrayDeque<>());
BalancerRegionLoads.add(mockBalancerRegionLoad(0));
BalancerRegionLoads.add(mockBalancerRegionLoad(baseLoad + i));
}
return regions;
}
protected RegionInfo mockRegionInfo(String encodeName, TableName tableName) {
RegionInfo regionInfo = mock(RegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn(encodeName);
when(regionInfo.getTable()).thenReturn(tableName);
return regionInfo;
}
protected Map<ServerName, List<RegionInfo>> mockServerRegions(TableName tableName, int s1Count,
int s2Count, int s3Count, Map<String, Deque<BalancerRegionLoad>> loads, long baseLoad) {
Map<ServerName, List<RegionInfo>> serverRegions = new HashMap<>(3);
serverRegions.put(sn1, mockTableRegions(tableName, s1Count, loads, baseLoad));
serverRegions.put(sn2, mockTableRegions(tableName, s2Count, loads, baseLoad));
serverRegions.put(sn3, mockTableRegions(tableName, s3Count, loads, baseLoad));
return serverRegions;
}
protected BalancerRegionLoad mockBalancerRegionLoad(long load) {
BalancerRegionLoad rl = mock(BalancerRegionLoad.class);
when(rl.getReadRequestsCount()).thenReturn(load);
when(rl.getWriteRequestsCount()).thenReturn(load);
return rl;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CLUSTER_BALANCE_STATUS_CHECKERS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -263,7 +264,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
metricRecordKey =
HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME;
}
double curOverallCost = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
double curOverallCost = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false);
double curOverallCostInMetrics =
dummyMetricsStochasticBalancer.getDummyCostsMap().get(metricRecordKey);
assertEquals(curOverallCost, curOverallCostInMetrics, 0.001);
@ -286,7 +287,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
assertTrue("There should be metrics record in MetricsStochasticBalancer",
!dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty());
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false);
double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get(
HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME);
assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001);
@ -308,7 +309,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
assertTrue("There should be metrics record in MetricsStochasticBalancer!",
!dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty());
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false);
double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get(
HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME);
assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001);
@ -481,16 +482,16 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
final int runs = 10;
for (int[] mockCluster : clusterStateMocks) {
BalancerClusterState cluster = mockCluster(mockCluster);
loadBalancer.initCosts(cluster);
loadBalancer.initCosts(cluster, false);
for (int i = 0; i != runs; ++i) {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE, false);
BalanceAction action = loadBalancer.nextAction(cluster);
cluster.doAction(action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action, false);
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction, false);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE, false);
assertEquals(expectedCost, actualCost, 0);
}
}
@ -597,7 +598,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
DummyCostFunction.class.getName());
loadBalancer.onConfigurationChange(conf);
assertTrue(Arrays.asList(loadBalancer.getCostFunctionNames())
assertTrue(Arrays.asList(loadBalancer.getCostFunctionNames(false))
.contains(DummyCostFunction.class.getSimpleName()));
}
@ -613,11 +614,67 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
WriteRequestCostFunction.class.getSimpleName(),
MemStoreSizeCostFunction.class.getSimpleName(), StoreFileCostFunction.class.getSimpleName());
List<String> actual = Arrays.asList(loadBalancer.getCostFunctionNames());
List<String> actual = Arrays.asList(loadBalancer.getCostFunctionNames(false));
assertTrue("ExpectedCostFunctions: " + expected + " ActualCostFunctions: " + actual,
CollectionUtils.isEqualCollection(expected, actual));
}
@Test
public void testNoPlansWithOverallSkewChecker() {
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
conf.setStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY, OverallSkewChecker.class.getName());
try {
// Test with/without per table balancer.
boolean[] perTableBalancerConfigs = { true, false };
for (boolean isByTable : perTableBalancerConfigs) {
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocksWithNoSlop) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
boolean emptyPlans = plans == null || plans.isEmpty();
assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster));
}
}
} finally {
// reset config
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
conf.setStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY, "");
loadBalancer.onConfigurationChange(conf);
}
}
@Test
public void testNoPlansWithCoarseBalance() {
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
try {
// Test with/without per table balancer.
boolean[] perTableBalancerConfigs = { true, false };
for (boolean isByTable : perTableBalancerConfigs) {
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
loadBalancer.onConfigurationChange(conf);
for (int[] mockCluster : clusterStateMocksWithNoSlop) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.coarseBalanceCluster(LoadOfAllTable);
boolean emptyPlans = plans == null || plans.isEmpty();
assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster));
}
}
} finally {
// reset config
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
loadBalancer.onConfigurationChange(conf);
}
}
private boolean needsBalanceIdleRegion(int[] cluster) {
return Arrays.stream(cluster).anyMatch(x -> x > 1)
&& Arrays.stream(cluster).anyMatch(x -> x < 1);

View File

@ -149,7 +149,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
regions = randomRegions(1);
map.put(s2, regions);
assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new BalancerClusterState(map, null, null, null)));
new BalancerClusterState(map, null, null, null), false));
// check for the case where there are two hosts on the same rack and there are two racks
// and both the replicas are on the same rack
map.clear();
@ -161,7 +161,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
// add another server so that the cluster has some host on another rack
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
assertFalse(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new BalancerClusterState(map, null, null, new ForTestRackManagerOne())));
new BalancerClusterState(map, null, null, new ForTestRackManagerOne()), false));
}
@Test

View File

@ -141,7 +141,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
loadBalancer.balanceTable(tableName, clusterState);
String[] tableNames = new String[] { tableName.getNameAsString() };
String[] functionNames = loadBalancer.getCostFunctionNames();
String[] functionNames = loadBalancer.getCostFunctionNames(false);
Set<String> jmxMetrics = readJmxMetricsWithRetry();
Set<String> expectedMetrics = getExpectedJmxMetrics(tableNames, functionNames);
@ -169,7 +169,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
// NOTE the size is normally set in setClusterMetrics, for test purpose, we set it manually
// Tables: hbase:namespace, table1, table2
// Functions: costFunctions, overall
String[] functionNames = loadBalancer.getCostFunctionNames();
String[] functionNames = loadBalancer.getCostFunctionNames(false);
loadBalancer.updateMetricsSize(3 * (functionNames.length + 1));
// table 1