HBASE-23073 Add an optional costFunction to balance regions according to a capacity rule (#677)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
(cherry picked from commit 42d535a57a
)
This commit is contained in:
parent
f19b8096f1
commit
d9c36e0dcf
|
@ -0,0 +1,282 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This is an optional Cost function designed to allow region count skew across RegionServers. A
|
||||
* rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
|
||||
* rule is composed of a regexp for hostname, and a limit. For example, we could have:
|
||||
* <p>
|
||||
* * rs[0-9] 200 * rs1[0-9] 50
|
||||
* </p>
|
||||
* RegionServers with hostname matching the first rules will have a limit of 200, and the others 50.
|
||||
* If there's no match, a default is set. The costFunction is trying to fill all RegionServers
|
||||
* linearly, meaning that if the global usage is at 50%, then all RegionServers should hold half of
|
||||
* their capacity in terms of regions. In order to use this CostFunction, you need to set the
|
||||
* following options:
|
||||
* <ul>
|
||||
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
|
||||
* <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li>
|
||||
* <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li>
|
||||
* </ul>
|
||||
* The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer.CostFunction {
|
||||
|
||||
/**
|
||||
* configuration used for the path where the rule file is stored.
|
||||
*/
|
||||
static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE =
|
||||
"hbase.master.balancer.heterogeneousRegionCountRulesFile";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(HeterogeneousRegionCountCostFunction.class);
|
||||
/**
|
||||
* Default rule to apply when the rule file is not found. Default to 200.
|
||||
*/
|
||||
private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT =
|
||||
"hbase.master.balancer.heterogeneousRegionCountDefault";
|
||||
/**
|
||||
* Cost for the function. Default to 500, can be changed.
|
||||
*/
|
||||
private static final String REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.heterogeneousRegionCountCost";
|
||||
private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
private final String rulesPath;
|
||||
|
||||
/**
|
||||
* Contains the rules, key is the regexp for ServerName, value is the limit
|
||||
*/
|
||||
private final Map<Pattern, Integer> limitPerRule;
|
||||
|
||||
/**
|
||||
* This is a cache, used to not go through all the limitPerRule map when searching for limit
|
||||
*/
|
||||
private final Map<ServerName, Integer> limitPerRS;
|
||||
private final Configuration conf;
|
||||
private int defaultNumberOfRegions;
|
||||
|
||||
/**
|
||||
* Total capacity of regions for the cluster, based on the online RS and their associated rules
|
||||
*/
|
||||
private int totalCapacity = 0;
|
||||
double overallUsage;
|
||||
|
||||
HeterogeneousRegionCountCostFunction(final Configuration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
this.limitPerRS = new HashMap<>();
|
||||
this.limitPerRule = new HashMap<>();
|
||||
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
|
||||
this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE);
|
||||
this.defaultNumberOfRegions =
|
||||
conf.getInt(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200);
|
||||
|
||||
if (this.defaultNumberOfRegions < 0) {
|
||||
LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT
|
||||
+ "'. Setting default to 200");
|
||||
this.defaultNumberOfRegions = 200;
|
||||
}
|
||||
if (conf.getFloat(StochasticLoadBalancer.RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
|
||||
StochasticLoadBalancer.RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
|
||||
LOG.warn("regionCountCost is not set to 0, "
|
||||
+ " this will interfere with the HeterogeneousRegionCountCostFunction!");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once per LB invocation to give the cost function to initialize it's state, and perform
|
||||
* any costly calculation.
|
||||
*/
|
||||
@Override
|
||||
void init(final BaseLoadBalancer.Cluster cluster) {
|
||||
this.cluster = cluster;
|
||||
this.loadRules();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
double cost = 0;
|
||||
final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity);
|
||||
|
||||
for (int i = 0; i < this.cluster.numServers; i++) {
|
||||
// retrieve capacity for each RS
|
||||
final ServerName sn = this.cluster.servers[i];
|
||||
final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
|
||||
final double nbrRegions = this.cluster.regionsPerServer[i].length;
|
||||
final double usage = nbrRegions / limit;
|
||||
if (usage > targetUsage) {
|
||||
// cost is the number of regions above the local limit
|
||||
final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit;
|
||||
cost += localCost;
|
||||
}
|
||||
}
|
||||
|
||||
return cost / (double) this.cluster.numServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* used to load the rule files.
|
||||
*/
|
||||
void loadRules() {
|
||||
final List<String> lines = readFile(this.rulesPath);
|
||||
if (null == lines) {
|
||||
LOG.warn("cannot load rules file, keeping latest rules file which has "
|
||||
+ this.limitPerRule.size() + " rules");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("loading rules file '" + this.rulesPath + "'");
|
||||
this.limitPerRule.clear();
|
||||
for (final String line : lines) {
|
||||
try {
|
||||
if (line.length() == 0) {
|
||||
continue;
|
||||
}
|
||||
if (line.startsWith("#")) {
|
||||
continue;
|
||||
}
|
||||
final String[] splits = line.split(" ");
|
||||
if (splits.length != 2) {
|
||||
throw new IOException(
|
||||
"line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
|
||||
}
|
||||
|
||||
final Pattern pattern = Pattern.compile(splits[0]);
|
||||
final Integer limit = Integer.parseInt(splits[1]);
|
||||
this.limitPerRule.put(pattern, limit);
|
||||
} catch (final IOException | NumberFormatException | PatternSyntaxException e) {
|
||||
LOG.error("error on line: " + e);
|
||||
}
|
||||
}
|
||||
this.rebuildCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* used to read the rule files from either HDFS or local FS
|
||||
*/
|
||||
private List<String> readFile(final String filename) {
|
||||
if (null == filename) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (filename.startsWith("file:")) {
|
||||
return readFileFromLocalFS(filename);
|
||||
}
|
||||
return readFileFromHDFS(filename);
|
||||
} catch (IOException e) {
|
||||
LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* used to read the rule files from HDFS
|
||||
*/
|
||||
private List<String> readFileFromHDFS(final String filename) throws IOException {
|
||||
final Path path = new Path(filename);
|
||||
final FileSystem fs = FileSystem.get(this.conf);
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
|
||||
return readLines(reader);
|
||||
}
|
||||
|
||||
/**
|
||||
* used to read the rule files from local FS
|
||||
*/
|
||||
private List<String> readFileFromLocalFS(final String filename) throws IOException {
|
||||
BufferedReader reader = new BufferedReader(new FileReader(filename));
|
||||
return readLines(reader);
|
||||
}
|
||||
|
||||
private List<String> readLines(BufferedReader reader) throws IOException {
|
||||
final List<String> records = new ArrayList<>();
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
records.add(line);
|
||||
}
|
||||
reader.close();
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rebuild cache matching ServerNames and their capacity.
|
||||
*/
|
||||
private void rebuildCache() {
|
||||
LOG.debug("Rebuilding cache of capacity for each RS");
|
||||
this.limitPerRS.clear();
|
||||
this.totalCapacity = 0;
|
||||
if (null == this.cluster) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < this.cluster.numServers; i++) {
|
||||
final ServerName sn = this.cluster.servers[i];
|
||||
final int capacity = this.findLimitForRS(sn);
|
||||
LOG.debug(sn.getHostname() + " can hold " + capacity + " regions");
|
||||
this.totalCapacity += capacity;
|
||||
}
|
||||
overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity;
|
||||
LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions ("
|
||||
+ Math.round(overallUsage * 100) + "%)");
|
||||
if (overallUsage >= 1) {
|
||||
LOG.warn("Cluster is overused");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the limit for a ServerName. If not found then return the default value
|
||||
* @param serverName the server we are looking for
|
||||
* @return the limit
|
||||
*/
|
||||
int findLimitForRS(final ServerName serverName) {
|
||||
boolean matched = false;
|
||||
int limit = -1;
|
||||
for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) {
|
||||
if (entry.getKey().matcher(serverName.getHostname()).matches()) {
|
||||
matched = true;
|
||||
limit = entry.getValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!matched) {
|
||||
limit = this.defaultNumberOfRegions;
|
||||
}
|
||||
// Feeding cache
|
||||
this.limitPerRS.put(serverName, limit);
|
||||
return limit;
|
||||
}
|
||||
|
||||
int getNumberOfRulesLoaded() {
|
||||
return this.limitPerRule.size();
|
||||
}
|
||||
}
|
|
@ -1191,9 +1191,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* regions on a cluster.
|
||||
*/
|
||||
static class RegionCountSkewCostFunction extends CostFunction {
|
||||
private static final String REGION_COUNT_SKEW_COST_KEY =
|
||||
static final String REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionCountCost";
|
||||
private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats = null;
|
||||
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import static junit.framework.TestCase.assertNotNull;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.master.RackManager;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestStochasticLoadBalancerHeterogeneousCost.class);
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestStochasticLoadBalancerHeterogeneousCost.class);
|
||||
private static final double allowedWindow = 1.20;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() {
|
||||
BalancerTestBase.conf = HBaseConfiguration.create();
|
||||
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
|
||||
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
|
||||
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
|
||||
BalancerTestBase.conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
|
||||
BalancerTestBase.conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
|
||||
HeterogeneousRegionCountCostFunction.class.getName());
|
||||
|
||||
BalancerTestBase.conf.set(
|
||||
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
|
||||
TestStochasticLoadBalancerHeterogeneousCostRules.DEFAULT_RULES_TMP_LOCATION);
|
||||
|
||||
BalancerTestBase.loadBalancer = new StochasticLoadBalancer();
|
||||
BalancerTestBase.loadBalancer.setConf(BalancerTestBase.conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefault() throws IOException {
|
||||
final List<String> rules = Collections.emptyList();
|
||||
|
||||
final int numNodes = 2;
|
||||
final int numRegions = 300;
|
||||
final int numRegionsPerServer = 250;
|
||||
|
||||
// Initial state: { rs1:50 , rs0:250 }
|
||||
// Cluster can hold 300/400 regions (75%)
|
||||
// Expected balanced Cluster: { rs0:150 , rs1:150 }
|
||||
this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneGroup() throws IOException {
|
||||
final List<String> rules = Collections.singletonList("rs.* 100");
|
||||
|
||||
final int numNodes = 4;
|
||||
final int numRegions = 300;
|
||||
final int numRegionsPerServer = 30;
|
||||
|
||||
// Initial state: { rs0:30 , rs1:30 , rs2:30 , rs3:210 }.
|
||||
// The cluster can hold 300/400 regions (75%)
|
||||
// Expected balanced Cluster: { rs0:75 , rs1:75 , rs2:75 , rs3:75 }
|
||||
this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoGroups() throws IOException {
|
||||
final List<String> rules = Arrays.asList("rs[0-4] 200", "rs[5-9] 50");
|
||||
|
||||
final int numNodes = 10;
|
||||
final int numRegions = 500;
|
||||
final int numRegionsPerServer = 50;
|
||||
|
||||
// Initial state: { rs0:50 , rs1:50 , rs2:50 , rs3:50 , rs4:50 , rs5:50 , rs6:50 , rs7:50 ,
|
||||
// rs8:50 , rs9:50 }
|
||||
// the cluster can hold 500/1250 regions (40%)
|
||||
// Expected balanced Cluster: { rs5:20 , rs6:20 , rs7:20 , rs8:20 , rs9:20 , rs0:80 , rs1:80 ,
|
||||
// rs2:80 , rs3:80 , rs4:80 }
|
||||
this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFourGroups() throws IOException {
|
||||
final List<String> rules = Arrays.asList("rs[1-3] 200", "rs[4-7] 250", "rs[8-9] 100");
|
||||
|
||||
final int numNodes = 10;
|
||||
final int numRegions = 800;
|
||||
final int numRegionsPerServer = 80;
|
||||
|
||||
// Initial state: { rs0:80 , rs1:80 , rs2:80 , rs3:80 , rs4:80 , rs5:80 , rs6:80 , rs7:80 ,
|
||||
// rs8:80 , rs9:80 }
|
||||
// Cluster can hold 800/2000 regions (40%)
|
||||
// Expected balanced Cluster: { rs8:40 , rs9:40 , rs2:80 , rs3:80 , rs1:82 , rs0:94 , rs4:96 ,
|
||||
// rs5:96 , rs6:96 , rs7:96 }
|
||||
this.testHeterogeneousWithCluster(numNodes, numRegions, numRegionsPerServer, rules);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverloaded() throws IOException {
|
||||
final List<String> rules = Collections.singletonList("rs[0-1] 50");
|
||||
|
||||
final int numNodes = 2;
|
||||
final int numRegions = 120;
|
||||
final int numRegionsPerServer = 60;
|
||||
|
||||
TestStochasticLoadBalancerHeterogeneousCostRules.createSimpleRulesFile(rules);
|
||||
final Map<ServerName, List<RegionInfo>> serverMap =
|
||||
this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
|
||||
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
// As we disabled all the other cost functions, balancing only according to
|
||||
// the heterogeneous cost function should return nothing.
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
private void testHeterogeneousWithCluster(final int numNodes, final int numRegions,
|
||||
final int numRegionsPerServer, final List<String> rules) throws IOException {
|
||||
|
||||
TestStochasticLoadBalancerHeterogeneousCostRules.createSimpleRulesFile(rules);
|
||||
final Map<ServerName, List<RegionInfo>> serverMap =
|
||||
this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
|
||||
this.testWithCluster(serverMap, null, true, false);
|
||||
}
|
||||
|
||||
protected void testWithCluster(final Map<ServerName, List<RegionInfo>> serverMap,
|
||||
final RackManager rackManager, final boolean assertFullyBalanced,
|
||||
final boolean assertFullyBalancedForReplicas) {
|
||||
final List<ServerAndLoad> list = this.convertToList(serverMap);
|
||||
LOG.info("Mock Cluster : " + this.printMock(list) + " " + this.printStats(list));
|
||||
|
||||
BalancerTestBase.loadBalancer.setRackManager(rackManager);
|
||||
|
||||
// Run the balancer.
|
||||
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Check to see that this actually got to a stable place.
|
||||
if (assertFullyBalanced || assertFullyBalancedForReplicas) {
|
||||
// Apply the plan to the mock cluster.
|
||||
final List<ServerAndLoad> balancedCluster = this.reconcile(list, plans, serverMap);
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock Balanced cluster : " + this.printMock(balancedCluster));
|
||||
|
||||
if (assertFullyBalanced) {
|
||||
final List<RegionPlan> secondPlans =
|
||||
BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
assertNull(secondPlans);
|
||||
|
||||
// create external cost function to retrieve limit
|
||||
// for each RS
|
||||
final HeterogeneousRegionCountCostFunction cf =
|
||||
new HeterogeneousRegionCountCostFunction(conf);
|
||||
assertNotNull(cf);
|
||||
BaseLoadBalancer.Cluster cluster =
|
||||
new BaseLoadBalancer.Cluster(serverMap, null, null, null);
|
||||
cf.init(cluster);
|
||||
|
||||
// checking that we all hosts have a number of regions below their limit
|
||||
for (final ServerAndLoad serverAndLoad : balancedCluster) {
|
||||
final ServerName sn = serverAndLoad.getServerName();
|
||||
final int numberRegions = serverAndLoad.getLoad();
|
||||
final int limit = cf.findLimitForRS(sn);
|
||||
|
||||
double usage = (double) numberRegions / (double) limit;
|
||||
LOG.debug(
|
||||
sn.getHostname() + ":" + numberRegions + "/" + limit + "(" + (usage * 100) + "%)");
|
||||
|
||||
// as the balancer is stochastic, we cannot check exactly the result of the balancing,
|
||||
// hence the allowedWindow parameter
|
||||
assertTrue("Host " + sn.getHostname() + " should be below "
|
||||
+ cf.overallUsage * allowedWindow * 100 + "%",
|
||||
usage <= cf.overallUsage * allowedWindow);
|
||||
}
|
||||
}
|
||||
|
||||
if (assertFullyBalancedForReplicas) {
|
||||
this.assertRegionReplicaPlacement(serverMap, rackManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<ServerName, List<RegionInfo>> createServerMap(int numNodes, int numRegions,
|
||||
int numRegionsPerServer, int replication, int numTables) {
|
||||
// construct a cluster of numNodes, having a total of numRegions. Each RS will hold
|
||||
// numRegionsPerServer many regions except for the last one, which will host all the
|
||||
// remaining regions
|
||||
int[] cluster = new int[numNodes];
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
cluster[i] = numRegionsPerServer;
|
||||
}
|
||||
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
|
||||
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(cluster, numTables);
|
||||
if (replication > 0) {
|
||||
// replicate the regions to the same servers
|
||||
for (List<RegionInfo> regions : clusterState.values()) {
|
||||
int length = regions.size();
|
||||
for (int i = 0; i < length; i++) {
|
||||
for (int r = 1; r < replication; r++) {
|
||||
regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TreeMap<ServerName, List<RegionInfo>> mockClusterServers(int[] mockCluster,
|
||||
int numTables) {
|
||||
int numServers = mockCluster.length;
|
||||
TreeMap<ServerName, List<RegionInfo>> servers = new TreeMap<>();
|
||||
for (int i = 0; i < numServers; i++) {
|
||||
int numRegions = mockCluster[i];
|
||||
ServerAndLoad sal = createServer("rs" + i);
|
||||
List<RegionInfo> regions = randomRegions(numRegions, numTables);
|
||||
servers.put(sal.getServerName(), regions);
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
|
||||
private Queue<ServerName> serverQueue = new LinkedList<>();
|
||||
|
||||
private ServerAndLoad createServer(final String host) {
|
||||
if (!this.serverQueue.isEmpty()) {
|
||||
ServerName sn = this.serverQueue.poll();
|
||||
return new ServerAndLoad(sn, 0);
|
||||
}
|
||||
Random rand = ThreadLocalRandom.current();
|
||||
int port = rand.nextInt(60000);
|
||||
long startCode = rand.nextLong();
|
||||
ServerName sn = ServerName.valueOf(host, port, startCode);
|
||||
return new ServerAndLoad(sn, 0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestStochasticLoadBalancerHeterogeneousCostRules extends BalancerTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestStochasticLoadBalancerHeterogeneousCostRules.class);
|
||||
|
||||
static final String DEFAULT_RULES_TMP_LOCATION = "/tmp/hbase-balancer.rules";
|
||||
static Configuration conf;
|
||||
private HeterogeneousRegionCountCostFunction costFunction;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
createSimpleRulesFile(new ArrayList<>());
|
||||
conf = new Configuration();
|
||||
conf.set(HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
|
||||
DEFAULT_RULES_TMP_LOCATION);
|
||||
}
|
||||
|
||||
static void createSimpleRulesFile(final List<String> lines) throws IOException {
|
||||
cleanup();
|
||||
final Path file = Paths.get(DEFAULT_RULES_TMP_LOCATION);
|
||||
Files.write(file, lines, Charset.forName("UTF-8"));
|
||||
}
|
||||
|
||||
protected static void cleanup() {
|
||||
final File file = new File(DEFAULT_RULES_TMP_LOCATION);
|
||||
file.delete();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterAllTests() {
|
||||
cleanup();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoRules() {
|
||||
cleanup();
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(0, this.costFunction.getNumberOfRulesLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadFormatInRules() throws IOException {
|
||||
createSimpleRulesFile(new ArrayList<>());
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(0, this.costFunction.getNumberOfRulesLoaded());
|
||||
|
||||
createSimpleRulesFile(Collections.singletonList("bad rules format"));
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(0, this.costFunction.getNumberOfRulesLoaded());
|
||||
|
||||
createSimpleRulesFile(Arrays.asList("srv[1-2] 10", "bad_rules format", "a"));
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(1, this.costFunction.getNumberOfRulesLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoRules() throws IOException {
|
||||
createSimpleRulesFile(Arrays.asList("^server1$ 10", "^server2 21"));
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(2, this.costFunction.getNumberOfRulesLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadRegexp() throws IOException {
|
||||
createSimpleRulesFile(Collections.singletonList("server[ 1"));
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(0, this.costFunction.getNumberOfRulesLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOverride() throws IOException {
|
||||
createSimpleRulesFile(Arrays.asList("^server1$ 10", "^server2 21"));
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(conf);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(2, this.costFunction.getNumberOfRulesLoaded());
|
||||
|
||||
// loading malformed configuration does not overload current
|
||||
cleanup();
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(2, this.costFunction.getNumberOfRulesLoaded());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingFomHDFS() throws Exception {
|
||||
|
||||
HBaseTestingUtility hBaseTestingUtility = new HBaseTestingUtility();
|
||||
hBaseTestingUtility.startMiniDFSCluster(3);
|
||||
|
||||
MiniDFSCluster cluster = hBaseTestingUtility.getDFSCluster();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
String path = cluster.getURI() + DEFAULT_RULES_TMP_LOCATION;
|
||||
|
||||
// writing file
|
||||
FSDataOutputStream stream = fs.create(new org.apache.hadoop.fs.Path(path));
|
||||
stream.write("server1 10".getBytes());
|
||||
stream.flush();
|
||||
stream.close();
|
||||
|
||||
Configuration configuration = hBaseTestingUtility.getConfiguration();
|
||||
|
||||
// start costFunction
|
||||
configuration.set(
|
||||
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE, path);
|
||||
this.costFunction = new HeterogeneousRegionCountCostFunction(configuration);
|
||||
this.costFunction.loadRules();
|
||||
Assert.assertEquals(1, this.costFunction.getNumberOfRulesLoaded());
|
||||
|
||||
hBaseTestingUtility.shutdownMiniCluster();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue