HBASE-26251 StochasticLoadBalancer metrics should update even if balancer doesn't run (#3678)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Reviewed-by: Bryan Beaudreault <bbeaudreault@hubspot.com>
This commit is contained in:
parent
cbebf85b3c
commit
63306942b1
|
@ -147,4 +147,13 @@ public interface LoadBalancer extends Stoppable, ConfigurationObserver {
|
|||
|
||||
/*Updates balancer status tag reported to JMX*/
|
||||
void updateBalancerStatus(boolean status);
|
||||
|
||||
/**
|
||||
* In some scenarios, Balancer needs to update internal status or information according to the
|
||||
* current tables load
|
||||
*
|
||||
* @param loadOfAllTable region load of servers for all table
|
||||
*/
|
||||
default void updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>>
|
||||
loadOfAllTable){}
|
||||
}
|
||||
|
|
|
@ -546,7 +546,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
new HashMap<>();
|
||||
}
|
||||
|
||||
private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
|
||||
protected final Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
|
||||
Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
|
||||
for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.function.Supplier;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -116,6 +117,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
"hbase.master.balancer.stochastic.minCostNeedBalance";
|
||||
protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
|
||||
"hbase.master.balancer.stochastic.additionalCostFunctions";
|
||||
public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
|
||||
|
||||
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
|
||||
|
||||
|
@ -152,6 +154,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
super(new MetricsStochasticBalancer());
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
|
||||
super(metricsStochasticBalancer);
|
||||
}
|
||||
|
||||
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
|
||||
Configuration conf) {
|
||||
try {
|
||||
|
@ -263,6 +271,32 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateBalancerTableLoadInfo(TableName tableName, Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
RegionHDFSBlockLocationFinder finder = null;
|
||||
if ((this.localityCost != null) || (this.rackLocalityCost != null)) {
|
||||
finder = this.regionFinder;
|
||||
}
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
|
||||
|
||||
initCosts(cluster);
|
||||
curOverallCost = computeCost(cluster, Double.MAX_VALUE);
|
||||
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
|
||||
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBalancerLoadInfo(
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
||||
if (isByTable) {
|
||||
loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
|
||||
updateBalancerTableLoadInfo(tableName, loadOfOneTable);
|
||||
});
|
||||
} else {
|
||||
updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the number of metrics that are reported to JMX
|
||||
*/
|
||||
|
@ -401,16 +435,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
initCosts(cluster);
|
||||
|
||||
if (!needsBalance(tableName, cluster)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
double currentCost = computeCost(cluster, Double.MAX_VALUE);
|
||||
curOverallCost = currentCost;
|
||||
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
|
||||
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
|
||||
double initCost = currentCost;
|
||||
double newCost;
|
||||
|
||||
if (!needsBalance(tableName, cluster)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
long computedMaxSteps;
|
||||
if (runMaxSteps) {
|
||||
computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
|
||||
|
@ -469,9 +504,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
metricsBalancer.balanceCluster(endTime - startTime);
|
||||
|
||||
// update costs metrics
|
||||
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
|
||||
if (initCost > currentCost) {
|
||||
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
|
||||
List<RegionPlan> plans = createRegionPlans(cluster);
|
||||
LOG.info("Finished computing new moving plan. Computation took {} ms" +
|
||||
" to try {} different iterations. Found a solution that moves " +
|
||||
|
@ -533,7 +567,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
|
||||
// overall cost
|
||||
balancer.updateStochasticCost(tableName.getNameAsString(),
|
||||
"Overall", "Overall cost", overall);
|
||||
OVERALL_COST_FUNCTION_NAME, "Overall cost", overall);
|
||||
|
||||
// each cost function
|
||||
for (int i = 0; i < costFunctions.size(); i++) {
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DummyMetricsStochasticBalancer extends MetricsStochasticBalancer {
|
||||
//We use a map to record those metrics that were updated to MetricsStochasticBalancer when running
|
||||
// unit tests.
|
||||
private Map<String, Double> costsMap;
|
||||
|
||||
public DummyMetricsStochasticBalancer() {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initSource() {
|
||||
costsMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void balanceCluster(long time) {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrMiscInvocations() {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void balancerStatus(boolean status) {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMetricsSize(int size) {
|
||||
//noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateStochasticCost(String tableName, String costFunctionName,
|
||||
String costFunctionDesc, Double value) {
|
||||
String key = tableName + "#" + costFunctionName;
|
||||
costsMap.put(key, value);
|
||||
}
|
||||
|
||||
public Map<String,Double> getDummyCostsMap(){
|
||||
return this.costsMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all metrics in the cache map then prepare to run the next test
|
||||
* */
|
||||
public void clearDummyMetrics(){
|
||||
this.costsMap.clear();
|
||||
}
|
||||
|
||||
}
|
|
@ -39,6 +39,8 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
|
|||
|
||||
protected static StochasticLoadBalancer loadBalancer;
|
||||
|
||||
protected static DummyMetricsStochasticBalancer dummyMetricsStochasticBalancer = new DummyMetricsStochasticBalancer();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
conf = HBaseConfiguration.create();
|
||||
|
@ -47,7 +49,7 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
|
|||
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer = new StochasticLoadBalancer(dummyMetricsStochasticBalancer);
|
||||
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||
loadBalancer.initialize();
|
||||
}
|
||||
|
|
|
@ -234,6 +234,98 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateBalancerLoadInfo(){
|
||||
int[] cluster = new int[] { 10, 0 };
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(cluster);
|
||||
BalancerClusterState clusterState = mockCluster(cluster);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(servers);
|
||||
try {
|
||||
boolean[] perTableBalancerConfigs = { true, false };
|
||||
for (boolean isByTable : perTableBalancerConfigs) {
|
||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
dummyMetricsStochasticBalancer.clearDummyMetrics();
|
||||
loadBalancer.updateBalancerLoadInfo(LoadOfAllTable);
|
||||
assertTrue("Metrics should be recorded!",
|
||||
dummyMetricsStochasticBalancer.getDummyCostsMap() != null && !dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty());
|
||||
|
||||
String metricRecordKey;
|
||||
if (isByTable) {
|
||||
metricRecordKey = "table1#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME;
|
||||
} else {
|
||||
metricRecordKey = HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME;
|
||||
}
|
||||
double curOverallCost = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
|
||||
double curOverallCostInMetrics =
|
||||
dummyMetricsStochasticBalancer.getDummyCostsMap().get(metricRecordKey);
|
||||
assertEquals(curOverallCost, curOverallCostInMetrics, 0.001);
|
||||
}
|
||||
}finally {
|
||||
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateStochasticCosts() {
|
||||
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||
try {
|
||||
int[] cluster = new int[] { 10, 0 };
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(cluster);
|
||||
BalancerClusterState clusterState = mockCluster(cluster);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
|
||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
dummyMetricsStochasticBalancer.clearDummyMetrics();
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster((Map)mockClusterServersWithTables(servers));
|
||||
|
||||
assertTrue("Balance plan should not be empty!", plans != null && !plans.isEmpty());
|
||||
assertTrue("There should be metrics record in MetricsStochasticBalancer",
|
||||
!dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty());
|
||||
|
||||
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
|
||||
double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get(
|
||||
HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME);
|
||||
assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001);
|
||||
} finally {
|
||||
//reset config
|
||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
||||
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateStochasticCostsIfBalanceNotRan() {
|
||||
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||
try {
|
||||
int[] cluster = new int[] { 10, 10 };
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(cluster);
|
||||
BalancerClusterState clusterState = mockCluster(cluster);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", Float.MAX_VALUE);
|
||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
dummyMetricsStochasticBalancer.clearDummyMetrics();
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster( (Map) mockClusterServersWithTables(servers));
|
||||
|
||||
assertTrue("Balance plan should be empty!", plans == null || plans.isEmpty());
|
||||
assertTrue("There should be metrics record in MetricsStochasticBalancer!",
|
||||
!dummyMetricsStochasticBalancer.getDummyCostsMap().isEmpty());
|
||||
|
||||
double overallCostOfCluster = loadBalancer.computeCost(clusterState, Double.MAX_VALUE);
|
||||
double overallCostInMetrics = dummyMetricsStochasticBalancer.getDummyCostsMap().get(
|
||||
HConstants.ENSEMBLE_TABLE_NAME + "#" + StochasticLoadBalancer.OVERALL_COST_FUNCTION_NAME);
|
||||
assertEquals(overallCostOfCluster, overallCostInMetrics, 0.001);
|
||||
} finally {
|
||||
//reset config
|
||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
|
||||
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedBalance() {
|
||||
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||
|
|
|
@ -1785,6 +1785,30 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
|
|||
return balance(BalanceRequest.defaultInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger a normal balance, see {@link HMaster#balance()} . If the balance is not executed
|
||||
* this time, the metrics related to the balance will be updated.
|
||||
*
|
||||
* When balance is running, related metrics will be updated at the same time. But if some
|
||||
* checking logic failed and cause the balancer exit early, we lost the chance to update
|
||||
* balancer metrics. This will lead to user missing the latest balancer info.
|
||||
* */
|
||||
public BalanceResponse balanceOrUpdateMetrics() throws IOException{
|
||||
synchronized (this.balancer) {
|
||||
BalanceResponse response = balance();
|
||||
if (!response.isBalancerRan()) {
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
|
||||
this.assignmentManager.getRegionStates().getAssignmentsForBalancer(this.tableStateManager,
|
||||
this.serverManager.getOnlineServersList());
|
||||
for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
|
||||
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
|
||||
}
|
||||
this.balancer.updateBalancerLoadInfo(assignments);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks master state before initiating action over region topology.
|
||||
* @param action the name of the action under consideration, for logging.
|
||||
|
|
|
@ -46,7 +46,7 @@ public class BalancerChore extends ScheduledChore {
|
|||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
master.balance();
|
||||
master.balanceOrUpdateMetrics();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to balance.", e);
|
||||
}
|
||||
|
|
|
@ -103,6 +103,12 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
|
|||
internalBalancer.updateClusterMetrics(sm);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>>
|
||||
loadOfAllTable){
|
||||
internalBalancer.updateBalancerLoadInfo(loadOfAllTable);
|
||||
}
|
||||
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue