diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index b6abdd5c7f6..420dc05238c 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -601,7 +601,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { * @see #balanceTable(TableName, Map) */ @Override - public final List + public List balanceCluster(Map>> loadOfAllTable) { preBalanceCluster(loadOfAllTable); if (isByTable) { diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/OverallSkewChecker.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/OverallSkewChecker.java new file mode 100644 index 00000000000..8a7edb6f938 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/OverallSkewChecker.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The cluster status checker for {@link StochasticLoadBalancer}, if the skew table counts up to a + * configured ratio, the simplified balancer will be executed, it is aimed to balance the regions as + * soon as possible. For example, when there are large-scaled restart of RSes, or expansion for + * groups or cluster, we hope the balancer can execute as soon as possible, but the + * StochasticLoadBalancer may need a lot of time to compute costs. + */ +@InterfaceAudience.Private +class OverallSkewChecker implements StochasticLoadBalancer.ClusterStatusBalanceChecker { + private static final Logger LOG = LoggerFactory.getLogger(OverallSkewChecker.class); + + private static final String BALANCER_OVERALL_SKEW_PERCENT_KEY = + "hbase.master.balancer.overall.skew.percentage"; + + private Configuration conf; + + public OverallSkewChecker(Configuration conf) { + this.conf = conf; + } + + @Override + public boolean + needsCoarseBalance(Map>> loadOfAllTable) { + if (this.conf == null) { + LOG.warn("Configuration should not be null"); + return false; + } + if (isClusterSkew(loadOfAllTable)) { + LOG.info("Balancer checker found cluster is skew, will perform force balance, " + + "table count is {}", loadOfAllTable.size()); + return true; + } + LOG.debug("Balancer checked cluster, it is not skew now, tables count is {}", + loadOfAllTable.size()); + return false; + } + + public boolean isClusterSkew(Map>> loadOfAllTable) { + if (loadOfAllTable == null || loadOfAllTable.isEmpty()) { + return false; + } + int skewCount = 0; + for (Map.Entry>> entry : loadOfAllTable + .entrySet()) { + if (isSkew(entry.getValue())) { + LOG.info("Table: " + entry.getKey().getNameAsString() + " regions count is skew"); + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder( + "Table: " + entry.getKey().getNameAsString() + " regions distribution is: "); + for (Map.Entry> regionDistribution : entry.getValue() + .entrySet()) { + sb.append(regionDistribution.getKey().getServerName()).append(" count=") + .append(regionDistribution.getValue().size()).append(":"); + sb.append(regionDistribution.getValue()); + } + LOG.debug(sb.toString()); + } + skewCount++; + } + } + LOG.info("Cluster is skew, skew table count={}, load table count={}", skewCount, + loadOfAllTable.size()); + return skewCount > 0 && (double) skewCount / loadOfAllTable.size() + > conf.getDouble(BALANCER_OVERALL_SKEW_PERCENT_KEY, 0.5); + } + + private boolean isSkew(Map> regions) { + if (regions == null || regions.isEmpty()) { + return false; + } + int max = 0; + int min = Integer.MAX_VALUE; + for (Map.Entry> entry : regions.entrySet()) { + int count = entry.getValue() == null ? 0 : entry.getValue().size(); + max = Math.max(max, count); + min = Math.min(min, count); + } + return max - min > 2; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index edf049e8a71..1bd77c8467e 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -18,16 +18,18 @@ package org.apache.hadoop.hbase.master.balancer; import com.google.errorprone.annotations.RestrictedApi; -import java.lang.reflect.Constructor; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -47,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + /** *

* This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and @@ -127,6 +131,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "hbase.master.balancer.stochastic.additionalCostFunctions"; public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; + /** Configuration for classes of cluster status checkers */ + protected static final String CLUSTER_BALANCE_STATUS_CHECKERS_KEY = + "hbase.master.balancer.stochastic.cluster.status.checkers"; + + /** + * Configuration for the minimum interval for coarse balance. Coarse balance is designed to happen + * on strict conditions, which should not occur often. And this can also limit the frequency of + * switching between coarse and accurate balance. + */ + protected static final String CLUSTER_COARSE_BALANCE_MIN_INTERVAL_KEY = + "hbase.master.balance.stochastic.cluster.coarse.balance.min.interval.ms"; + private static final long CLUSTER_COARSE_BALANCE_MIN_INTERVAL_DEFAULT = 4 * 60 * 60 * 1000; // 4h + Map> loads = new HashMap<>(); // values are defaults @@ -138,6 +155,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; private List costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC + private List coarseCostFunctions; // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost private float sumMultiplier; // to save and report costs to JMX @@ -156,6 +174,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { protected List candidateGenerators; + protected List clusterStatusCheckers; + protected long lastCoarseTimestamp = 0; + public enum GeneratorType { RANDOM, LOAD, @@ -177,35 +198,34 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { super(metricsStochasticBalancer); } - private static CostFunction createCostFunction(Class clazz, - Configuration conf) { - try { - Constructor ctor = clazz.getDeclaredConstructor(Configuration.class); - return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); - } catch (NoSuchMethodException e) { - // will try construct with no parameter - } - return ReflectionUtils.newInstance(clazz); - } + private List loadFunctions(Configuration conf, String configName) { + String[] classNames = conf.getStrings(configName); - private void loadCustomCostFunctions(Configuration conf) { - String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); - - if (null == functionsNames) { - return; + if (null == classNames) { + return new ArrayList<>(); } - for (String className : functionsNames) { - Class clazz; + + return Arrays.stream(classNames).map(c -> { + Class klass = null; try { - clazz = Class.forName(className).asSubclass(CostFunction.class); + klass = (Class) Class.forName(c); } catch (ClassNotFoundException e) { - LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); - continue; + LOG.warn("Cannot load class " + c + "': " + e.getMessage()); } - CostFunction func = createCostFunction(clazz, conf); - LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); - costFunctions.add(func); - } + if (null == klass) { + return null; + } + + T reflected; + try { + reflected = ReflectionUtils.newInstance(klass, conf); + } catch (UnsupportedOperationException e) { + // will try construct with no parameter + reflected = ReflectionUtils.newInstance(klass); + } + LOG.info("Successfully loaded class {}", reflected.getClass().getSimpleName()); + return reflected; + }).filter(Objects::nonNull).collect(Collectors.toList()); } @RestrictedApi(explanation = "Should only be called in tests", link = "", @@ -256,14 +276,41 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { addCostFunction(new WriteRequestCostFunction(conf)); addCostFunction(new MemStoreSizeCostFunction(conf)); addCostFunction(new StoreFileCostFunction(conf)); - loadCustomCostFunctions(conf); + List additionals = + (List) loadFunctions(conf, COST_FUNCTIONS_COST_FUNCTIONS_KEY); + if (!CollectionUtils.isEmpty(additionals)) { + for (CostFunction costFunction : additionals) { + addCostFunction(costFunction); + } + } + + coarseCostFunctions = new ArrayList<>(); + coarseCostFunctions.add(new RegionCountSkewCostFunction(conf)); + coarseCostFunctions.add(new PrimaryRegionCountSkewCostFunction(conf)); + coarseCostFunctions.add(new MoveCostFunction(conf, provider)); + coarseCostFunctions.add(regionReplicaHostCostFunction); + coarseCostFunctions.add(regionReplicaRackCostFunction); + coarseCostFunctions.add(new TableSkewCostFunction(conf)); + + assert coarseCostFunctions.size() <= costFunctions.size() + : "Simple cost functions should be less than normal!"; curFunctionCosts = new double[costFunctions.size()]; tempFunctionCosts = new double[costFunctions.size()]; + clusterStatusCheckers = new LinkedList<>(); + List statusCheckers = + (List) loadFunctions(conf, CLUSTER_BALANCE_STATUS_CHECKERS_KEY); + if (!CollectionUtils.isEmpty(statusCheckers)) { + clusterStatusCheckers.addAll(statusCheckers); + } + LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" - + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) + + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames(false)) + + ", simpleCostFunctions=" + Arrays.toString(getCostFunctionNames(true)) + + ", clusterStatusCheckers=" + + Arrays.toString(conf.getStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY)) + " , sum of multiplier of cost functions = " + sumMultiplier + " etc."); } @@ -276,7 +323,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { try { // by-table or ensemble mode int tablesCount = isByTable ? provider.getNumberOfTables() : 1; - int functionsCount = getCostFunctionNames().length; + int functionsCount = getCostFunctionNames(false).length; updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall } catch (Exception e) { @@ -284,6 +331,60 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + @Override + public List + balanceCluster(Map>> loadOfAllTable) { + long current = EnvironmentEdgeManager.currentTime(); + if ( + clusterStatusCheckers != null && current - lastCoarseTimestamp > getConf().getLong( + CLUSTER_COARSE_BALANCE_MIN_INTERVAL_KEY, CLUSTER_COARSE_BALANCE_MIN_INTERVAL_DEFAULT) + ) { + for (ClusterStatusBalanceChecker clusterStatusBalancer : clusterStatusCheckers) { + boolean coarseBalance = clusterStatusBalancer.needsCoarseBalance(loadOfAllTable); + if (coarseBalance) { + // no matter whether the coarse balance generates plans, we record the + // lastCoarseTimestamp as the timestamp of calculating coarse plans + lastCoarseTimestamp = current; + List plans = coarseBalanceCluster(loadOfAllTable); + if (plans != null && !plans.isEmpty()) { + LOG.debug("Cluster needs coarse balance, and plans size={}", plans.size()); + return plans; + } else { + LOG.warn("Cluster needs coarse balance, but generated no plans by current " + + "coarseCostFunctions, please see more details in HBASE-25768"); + break; + } + } + } + } + return super.balanceCluster(loadOfAllTable); + } + + /** + * Use the coarse cost functions to balance cluster. + */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") + List + coarseBalanceCluster(Map>> loadOfAllTable) { + preBalanceCluster(loadOfAllTable); + if (isByTable) { + List result = new ArrayList<>(); + loadOfAllTable.forEach((tableName, loadOfOneTable) -> { + LOG.info("Coarse balance start Generate Balance plan for table: " + tableName); + List partialPlans = balanceTable(tableName, loadOfOneTable, true); + if (partialPlans != null) { + result.addAll(partialPlans); + } + }); + return result; + } else { + LOG.info("Coarse balance start Generate Balance plans for cluster"); + return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable), + true); + } + } + private void updateBalancerTableLoadInfo(TableName tableName, Map> loadOfOneTable) { RegionHDFSBlockLocationFinder finder = null; @@ -293,10 +394,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); - initCosts(cluster); - curOverallCost = computeCost(cluster, Double.MAX_VALUE); + initCosts(cluster, false); + curOverallCost = computeCost(cluster, Double.MAX_VALUE, false); System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); - updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); + updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, false); } @Override @@ -342,8 +443,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") - boolean needsBalance(TableName tableName, BalancerClusterState cluster) { + allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer|Checker).java") + boolean needsBalance(TableName tableName, BalancerClusterState cluster, boolean coarse) { ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); if (cs.getNumServers() < MIN_SERVER_BALANCE) { LOG.info( @@ -354,24 +455,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } if (areSomeRegionReplicasColocated(cluster)) { LOG.info("Running balancer because at least one server hosts replicas of the same region." - + " function cost={}", functionCost()); + + " function cost={}", functionCost(coarse)); return true; } if (idleRegionServerExist(cluster)) { LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", - functionCost()); + functionCost(coarse)); return true; } if (sloppyRegionServerExist(cs)) { LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", - functionCost()); + functionCost(coarse)); return true; } double total = 0.0; - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { if (!c.isNeeded()) { LOG.trace("{} not needed", c.getClass().getSimpleName()); continue; @@ -383,14 +484,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (balanced) { final double calculatedTotal = total; sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, sumMultiplier), - costFunctions); + coarse ? coarseCostFunctions : costFunctions); LOG.info( "{} - skipping load balancing because weighted average imbalance={} <= " + "threshold({}). If you want more aggressive balancing, either lower " + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " + "multiplier(s) of the specific cost function(s). functionCost={}", isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, - minCostNeedBalance, minCostNeedBalance, functionCost()); + minCostNeedBalance, minCostNeedBalance, functionCost(coarse)); } else { LOG.info("{} - Calculating plan. may take up to {}ms to complete.", isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); @@ -447,6 +548,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override protected List balanceTable(TableName tableName, Map> loadOfOneTable) { + return balanceTable(tableName, loadOfOneTable, false); + } + + private List balanceTable(TableName tableName, + Map> loadOfOneTable, boolean coarse) { // On clusters with lots of HFileLinks or lots of reference files, // instantiating the storefile infos can be quite expensive. // Allow turning this feature off if the locality cost is not going to @@ -464,10 +570,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { long startTime = EnvironmentEdgeManager.currentTime(); - initCosts(cluster); + initCosts(cluster, coarse); sumMultiplier = 0; - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { if (c.isNeeded()) { sumMultiplier += c.getMultiplier(); } @@ -478,14 +584,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return null; } - double currentCost = computeCost(cluster, Double.MAX_VALUE); + double currentCost = computeCost(cluster, Double.MAX_VALUE, coarse); curOverallCost = currentCost; System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); - updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); + updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, coarse); double initCost = currentCost; double newCost; - if (!needsBalance(tableName, cluster)) { + if (!needsBalance(tableName, cluster, coarse)) { return null; } @@ -507,9 +613,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { LOG.info( "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " + "functionCost={} computedMaxSteps={}", - currentCost / sumMultiplier, functionCost(), computedMaxSteps); + currentCost / sumMultiplier, functionCost(coarse), computedMaxSteps); - final String initFunctionTotalCosts = totalCostsPerFunc(); + final String initFunctionTotalCosts = totalCostsPerFunc(coarse); // Perform a stochastic walk to see if we can get a good fit. long step; @@ -521,9 +627,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } cluster.doAction(action); - updateCostsAndWeightsWithAction(cluster, action); + updateCostsAndWeightsWithAction(cluster, action, coarse); - newCost = computeCost(cluster, currentCost); + newCost = computeCost(cluster, currentCost, coarse); // Should this be kept? if (newCost < currentCost) { @@ -537,7 +643,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // TODO: undo by remembering old values BalanceAction undoAction = action.undoAction(); cluster.doAction(undoAction); - updateCostsAndWeightsWithAction(cluster, undoAction); + updateCostsAndWeightsWithAction(cluster, undoAction, coarse); } if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { @@ -549,16 +655,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { metricsBalancer.balanceCluster(endTime - startTime); if (initCost > currentCost) { - updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); + updateStochasticCosts(tableName, curOverallCost, curFunctionCosts, coarse); List plans = createRegionPlans(cluster); + LOG.info( "Finished computing new moving plan. Computation took {} ms" + " to try {} different iterations. Found a solution that moves " + "{} regions; Going from a computed imbalance of {}" + " to a new imbalance of {}. funtionCost={}", endTime - startTime, step, plans.size(), initCost / sumMultiplier, - currentCost / sumMultiplier, functionCost()); - sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); + currentCost / sumMultiplier, functionCost(coarse)); + sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step, + coarse); return plans; } LOG.info( @@ -585,7 +693,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } private void sendRegionPlansToRingBuffer(List plans, double currentCost, - double initCost, String initFunctionTotalCosts, long step) { + double initCost, String initFunctionTotalCosts, long step, boolean coarse) { provider.recordBalancerDecision(() -> { List regionPlans = new ArrayList<>(); for (RegionPlan plan : plans) { @@ -595,7 +703,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } return new BalancerDecision.Builder().setInitTotalCost(initCost) .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) - .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) + .setFinalFunctionCosts(totalCostsPerFunc(coarse)).setComputedSteps(step) .setRegionPlans(regionPlans).build(); }); } @@ -603,7 +711,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { /** * update costs to JMX */ - private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { + private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts, + boolean coarse) { if (tableName == null) { return; } @@ -616,8 +725,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "Overall cost", overall); // each cost function - for (int i = 0; i < costFunctions.size(); i++) { - CostFunction costFunction = costFunctions.get(i); + List realCostFunctions = coarse ? coarseCostFunctions : costFunctions; + for (int i = 0; i < realCostFunctions.size(); i++) { + CostFunction costFunction = realCostFunctions.get(i); String costFunctionName = costFunction.getClass().getSimpleName(); double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); // TODO: cost function may need a specific description @@ -634,9 +744,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - private String functionCost() { + private String functionCost(boolean coarse) { StringBuilder builder = new StringBuilder(); - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { builder.append(c.getClass().getSimpleName()); builder.append(" : ("); if (c.isNeeded()) { @@ -655,9 +765,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return builder.toString(); } - private String totalCostsPerFunc() { + private String totalCostsPerFunc(boolean coarse) { StringBuilder builder = new StringBuilder(); - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { if (!c.isNeeded()) { continue; } @@ -731,10 +841,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") - void initCosts(BalancerClusterState cluster) { + void initCosts(BalancerClusterState cluster, boolean coarse) { // Initialize the weights of generator every time weightsOfGenerators = new double[this.candidateGenerators.size()]; - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { c.prepare(cluster); c.updateWeight(weightsOfGenerators); } @@ -745,12 +855,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") - void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { + void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action, + boolean coarse) { // Reset all the weights to 0 for (int i = 0; i < weightsOfGenerators.length; i++) { weightsOfGenerators[i] = 0; } - for (CostFunction c : costFunctions) { + for (CostFunction c : coarse ? coarseCostFunctions : costFunctions) { if (c.isNeeded()) { c.postAction(action); c.updateWeight(weightsOfGenerators); @@ -763,10 +874,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") - String[] getCostFunctionNames() { - String[] ret = new String[costFunctions.size()]; - for (int i = 0; i < costFunctions.size(); i++) { - CostFunction c = costFunctions.get(i); + String[] getCostFunctionNames(boolean coarse) { + List functions = coarse ? coarseCostFunctions : costFunctions; + String[] ret = new String[functions.size()]; + for (int i = 0; i < functions.size(); i++) { + CostFunction c = functions.get(i); ret[i] = c.getClass().getSimpleName(); } @@ -783,12 +895,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") - double computeCost(BalancerClusterState cluster, double previousCost) { + double computeCost(BalancerClusterState cluster, double previousCost, boolean coarse) { double total = 0; - for (int i = 0; i < costFunctions.size(); i++) { - CostFunction c = costFunctions.get(i); - this.tempFunctionCosts[i] = 0.0; + List toComputeCostFunctions = coarse ? coarseCostFunctions : costFunctions; + Arrays.fill(this.tempFunctionCosts, 0.0); + for (int i = 0; i < toComputeCostFunctions.size(); i++) { + CostFunction c = toComputeCostFunctions.get(i); if (!c.isNeeded()) { continue; @@ -814,4 +927,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { static String composeAttributeName(String tableName, String costFunctionName) { return tableName + TABLE_FUNCTION_SEP + costFunctionName; } + + /** + * The interface for overall cluster status checkers, used in {@code StochasticLoadBalancer}. + */ + public interface ClusterStatusBalanceChecker { + + /** + * Check if the cluster needs simplified balance to ack quickly, ignoring the complex and + * time-consuming calculations in normal steps of {@code StochasticLoadBalancer}. + */ + boolean needsCoarseBalance(Map>> loadOfAllTable); + } } diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/DummyCostFunction.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/DummyCostFunction.java index 83b22da4bec..4a15bb33a3b 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/DummyCostFunction.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/DummyCostFunction.java @@ -23,4 +23,9 @@ public class DummyCostFunction extends CostFunction { protected double cost() { return 0; } + + @Override + float getMultiplier() { + return 1; + } } diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestOverallSkewChecker.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestOverallSkewChecker.java new file mode 100644 index 00000000000..88012a99835 --- /dev/null +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestOverallSkewChecker.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestOverallSkewChecker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestOverallSkewChecker.class); + + private final ServerName sn1 = ServerName.valueOf("host1,5000,1111"); + private final ServerName sn2 = ServerName.valueOf("host2,5000,2222"); + private final ServerName sn3 = ServerName.valueOf("host3,5000,3333"); + private final TableName t1 = TableName.valueOf("table1"); + private final TableName t2 = TableName.valueOf("table2"); + private final TableName t3 = TableName.valueOf("table3"); + + @Test + public void testCheckSkew() { + OverallSkewChecker overallSkewChecker = new OverallSkewChecker(HBaseConfiguration.create()); + Map> loads = new HashMap<>(); + Map>> tableLoads = + mockTwoTableSkewClusterLoad(loads); + assertTrue(overallSkewChecker.needsCoarseBalance(tableLoads)); + } + + @Test + public void testCheckNotSkew() { + OverallSkewChecker overallSkewChecker = new OverallSkewChecker(HBaseConfiguration.create()); + Map> loads = new HashMap<>(); + Map>> tableLoads = + mockOneTableSkewClusterLoad(loads); + assertFalse(overallSkewChecker.needsCoarseBalance(tableLoads)); + } + + private Map>> + mockOneTableSkewClusterLoad(Map> loads) { + Map>> allTableLoads = new HashMap<>(); + allTableLoads.put(t1, mockServerRegions(t1, 10, 5, 3, loads, 80000L)); + allTableLoads.put(t2, mockServerRegions(t2, 5, 5, 5, loads, 500L)); + allTableLoads.put(t3, mockServerRegions(t3, 3, 2, 1, loads, 100L)); + return allTableLoads; + } + + private Map>> + mockTwoTableSkewClusterLoad(Map> loads) { + Map>> allTableLoads = new HashMap<>(); + allTableLoads.put(t1, mockServerRegions(t1, 10, 5, 3, loads, 80000L)); + allTableLoads.put(t2, mockServerRegions(t2, 5, 1, 2, loads, 500L)); + allTableLoads.put(t3, mockServerRegions(t3, 3, 2, 1, loads, 100L)); + return allTableLoads; + } + + protected List mockTableRegions(TableName tableName, int count, + Map> loads, long baseLoad) { + return mockTableRegions(tableName, count, loads, baseLoad, null); + } + + protected List mockTableRegions(TableName tableName, int count, + Map> loads, long baseLoad, String baseRegionName) { + List regions = new ArrayList<>(); + for (int i = 0; i < count; i++) { + RegionInfo mockRegion = mockRegionInfo( + tableName.getNameAsString() + (baseRegionName == null ? "_region_" : baseRegionName) + i, + tableName); + regions.add(mockRegion); + Deque BalancerRegionLoads = + loads.computeIfAbsent(mockRegion.getEncodedName(), k -> new ArrayDeque<>()); + BalancerRegionLoads.add(mockBalancerRegionLoad(0)); + BalancerRegionLoads.add(mockBalancerRegionLoad(baseLoad + i)); + } + return regions; + } + + protected RegionInfo mockRegionInfo(String encodeName, TableName tableName) { + RegionInfo regionInfo = mock(RegionInfo.class); + when(regionInfo.getEncodedName()).thenReturn(encodeName); + when(regionInfo.getTable()).thenReturn(tableName); + return regionInfo; + } + + protected Map> mockServerRegions(TableName tableName, int s1Count, + int s2Count, int s3Count, Map> loads, long baseLoad) { + Map> serverRegions = new HashMap<>(3); + serverRegions.put(sn1, mockTableRegions(tableName, s1Count, loads, baseLoad)); + serverRegions.put(sn2, mockTableRegions(tableName, s2Count, loads, baseLoad)); + serverRegions.put(sn3, mockTableRegions(tableName, s3Count, loads, baseLoad)); + return serverRegions; + } + + protected BalancerRegionLoad mockBalancerRegionLoad(long load) { + BalancerRegionLoad rl = mock(BalancerRegionLoad.class); + when(rl.getReadRequestsCount()).thenReturn(load); + when(rl.getWriteRequestsCount()).thenReturn(load); + return rl; + } +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 21f3a3b66c9..1fe34c09549 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.balancer; +import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CLUSTER_BALANCE_STATUS_CHECKERS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -263,7 +264,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { metricRecordKey = HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME; } - double curOverallCost = loadBalancer.computeCost(clusterState, Double.MAX_VALUE); + double curOverallCost = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false); double curOverallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get(metricRecordKey); assertEquals(curOverallCost, curOverallCostInMetrics, 0.001); @@ -286,7 +287,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { assertTrue("There should be metrics record in MetricsStochasticBalancer", !dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty()); - double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE); + double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false); double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get( HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME); assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001); @@ -308,7 +309,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { assertTrue("There should be metrics record in MetricsStochasticBalancer!", !dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty()); - double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE); + double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE, false); double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get( HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME); assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001); @@ -481,16 +482,16 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { final int runs = 10; for (int[] mockCluster : clusterStateMocks) { BalancerClusterState cluster = mockCluster(mockCluster); - loadBalancer.initCosts(cluster); + loadBalancer.initCosts(cluster, false); for (int i = 0; i != runs; ++i) { - final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); + final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE, false); BalanceAction action = loadBalancer.nextAction(cluster); cluster.doAction(action); - loadBalancer.updateCostsAndWeightsWithAction(cluster, action); + loadBalancer.updateCostsAndWeightsWithAction(cluster, action, false); BalanceAction undoAction = action.undoAction(); cluster.doAction(undoAction); - loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction); - final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); + loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction, false); + final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE, false); assertEquals(expectedCost, actualCost, 0); } } @@ -597,7 +598,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { DummyCostFunction.class.getName()); loadBalancer.onConfigurationChange(conf); - assertTrue(Arrays.asList(loadBalancer.getCostFunctionNames()) + assertTrue(Arrays.asList(loadBalancer.getCostFunctionNames(false)) .contains(DummyCostFunction.class.getSimpleName())); } @@ -613,11 +614,67 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase { WriteRequestCostFunction.class.getSimpleName(), MemStoreSizeCostFunction.class.getSimpleName(), StoreFileCostFunction.class.getSimpleName()); - List actual = Arrays.asList(loadBalancer.getCostFunctionNames()); + List actual = Arrays.asList(loadBalancer.getCostFunctionNames(false)); assertTrue("ExpectedCostFunctions: " + expected + " ActualCostFunctions: " + actual, CollectionUtils.isEqualCollection(expected, actual)); } + @Test + public void testNoPlansWithOverallSkewChecker() { + float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f); + conf.setStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY, OverallSkewChecker.class.getName()); + try { + // Test with/without per table balancer. + boolean[] perTableBalancerConfigs = { true, false }; + for (boolean isByTable : perTableBalancerConfigs) { + conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); + loadBalancer.onConfigurationChange(conf); + for (int[] mockCluster : clusterStateMocksWithNoSlop) { + Map> servers = mockClusterServers(mockCluster); + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(servers); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + boolean emptyPlans = plans == null || plans.isEmpty(); + assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster)); + } + } + } finally { + // reset config + conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost); + conf.setStrings(CLUSTER_BALANCE_STATUS_CHECKERS_KEY, ""); + loadBalancer.onConfigurationChange(conf); + } + } + + @Test + public void testNoPlansWithCoarseBalance() { + float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f); + try { + // Test with/without per table balancer. + boolean[] perTableBalancerConfigs = { true, false }; + for (boolean isByTable : perTableBalancerConfigs) { + conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); + loadBalancer.onConfigurationChange(conf); + for (int[] mockCluster : clusterStateMocksWithNoSlop) { + Map> servers = mockClusterServers(mockCluster); + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(servers); + List plans = loadBalancer.coarseBalanceCluster(LoadOfAllTable); + boolean emptyPlans = plans == null || plans.isEmpty(); + assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster)); + } + } + } finally { + // reset config + conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost); + loadBalancer.onConfigurationChange(conf); + } + } + private boolean needsBalanceIdleRegion(int[] cluster) { return Arrays.stream(cluster).anyMatch(x -> x > 1) && Arrays.stream(cluster).anyMatch(x -> x < 1); diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java index bd437425f21..eb000662135 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java @@ -149,7 +149,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT regions = randomRegions(1); map.put(s2, regions); assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, - new BalancerClusterState(map, null, null, null))); + new BalancerClusterState(map, null, null, null), false)); // check for the case where there are two hosts on the same rack and there are two racks // and both the replicas are on the same rack map.clear(); @@ -161,7 +161,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT // add another server so that the cluster has some host on another rack map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1)); assertFalse(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, - new BalancerClusterState(map, null, null, new ForTestRackManagerOne()))); + new BalancerClusterState(map, null, null, new ForTestRackManagerOne()), false)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticBalancerJmxMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticBalancerJmxMetrics.java index a96e320a6fe..9295ed20548 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticBalancerJmxMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticBalancerJmxMetrics.java @@ -141,7 +141,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase { loadBalancer.balanceTable(tableName, clusterState); String[] tableNames = new String[] { tableName.getNameAsString() }; - String[] functionNames = loadBalancer.getCostFunctionNames(); + String[] functionNames = loadBalancer.getCostFunctionNames(false); Set jmxMetrics = readJmxMetricsWithRetry(); Set expectedMetrics = getExpectedJmxMetrics(tableNames, functionNames); @@ -169,7 +169,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase { // NOTE the size is normally set in setClusterMetrics, for test purpose, we set it manually // Tables: hbase:namespace, table1, table2 // Functions: costFunctions, overall - String[] functionNames = loadBalancer.getCostFunctionNames(); + String[] functionNames = loadBalancer.getCostFunctionNames(false); loadBalancer.updateMetricsSize(3 * (functionNames.length + 1)); // table 1