HBASE-26337 Optimization for weighted random generators (#3732)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
clarax 2021-11-08 16:17:21 -08:00 committed by GitHub
parent 601467f1ae
commit 62cd2b688e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 117 additions and 20 deletions

View File

@ -79,4 +79,4 @@ abstract class CostFromRegionLoadFunction extends CostFunction {
}
protected abstract double getCostFromRl(BalancerRegionLoad rl);
}
}

View File

@ -83,6 +83,16 @@ abstract class CostFunction {
protected abstract double cost();
/**
* Add the cost of this cost function to the weight of the candidate generator that is optimized
* for this cost function. By default it is the RandomCandiateGenerator for a cost function.
* Called once per init or after postAction.
* @param weights the weights for every generator.
*/
public void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
}
/**
* Scale the value between 0 and 1.
* @param min Min value

View File

@ -90,6 +90,14 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
return fnPickers;
}
/**
* @return any candidate generator in random
*/
@Override
protected CandidateGenerator getRandomGenerator() {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
}
/**
* Round robin assignment: Segregate the regions into two types:
*

View File

@ -88,4 +88,8 @@ abstract class LocalityBasedCostFunction extends CostFunction {
return cluster.getOrComputeWeightedLocality(region, entity, type);
}
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
}
}

View File

@ -59,4 +59,9 @@ class RegionCountSkewCostFunction extends CostFunction {
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
}
}

View File

@ -74,6 +74,11 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
return scale(0, maxCost, totalCost);
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
}
/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,

View File

@ -129,7 +129,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.025f;
private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
// To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
private float sumMultiplier;
@ -137,6 +136,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private double curOverallCost = 0d;
private double[] tempFunctionCosts;
private double[] curFunctionCosts;
private double[] weightsOfGenerators;
// Keep locality based picker and cost function to alert them
// when new services are offered
@ -146,6 +146,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
protected List<CandidateGenerator> candidateGenerators;
public enum GeneratorType {
RANDOM, LOAD, LOCALITY, RACK
}
/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
* default MetricsBalancer
@ -204,10 +210,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected List<CandidateGenerator> createCandidateGenerators() {
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
candidateGenerators.add(new RandomCandidateGenerator());
candidateGenerators.add(new LoadCandidateGenerator());
candidateGenerators.add(localityCandidateGenerator);
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
candidateGenerators.add(GeneratorType.RACK.ordinal(),
new RegionReplicaRackCandidateGenerator());
return candidateGenerators;
}
@ -380,8 +387,33 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
BalanceAction nextAction(BalancerClusterState cluster) {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
.generate(cluster);
return getRandomGenerator().generate(cluster);
}
/**
* Select the candidate generator to use based on the cost of cost functions. The chance of
* selecting a candidate generator is propotional to the share of cost of all cost functions among
* all cost functions that benefit from it.
*/
protected CandidateGenerator getRandomGenerator() {
double sum = 0;
for (int i = 0; i < weightsOfGenerators.length; i++) {
sum += weightsOfGenerators[i];
weightsOfGenerators[i] = sum;
}
if (sum == 0) {
return candidateGenerators.get(0);
}
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] /= sum;
}
double rand = ThreadLocalRandom.current().nextDouble();
for (int i = 0; i < weightsOfGenerators.length; i++) {
if (rand <= weightsOfGenerators[i]) {
return candidateGenerators.get(i);
}
}
return candidateGenerators.get(candidateGenerators.size() - 1);
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
@ -474,7 +506,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
cluster.doAction(action);
updateCostsWithAction(cluster, action);
updateCostsAndWeightsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);
@ -490,7 +522,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// TODO: undo by remembering old values
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
updateCostsWithAction(cluster, undoAction);
updateCostsAndWeightsWithAction(cluster, undoAction);
}
if (EnvironmentEdgeManager.currentTime() - startTime >
@ -685,17 +717,28 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
for (CostFunction c : costFunctions) {
c.prepare(cluster);
c.updateWeight(weightsOfGenerators);
}
}
/**
* Update both the costs of costfunctions and the weights of candidate generators
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
}
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
c.postAction(action);
c.updateWeight(weightsOfGenerators);
}
}
}

View File

@ -39,7 +39,8 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
protected static StochasticLoadBalancer loadBalancer;
protected static DummyMetricsStochasticBalancer dummyMetricsStochasticBalancer = new DummyMetricsStochasticBalancer();
protected static DummyMetricsStochasticBalancer dummyMetricsStochasticBalancer = new
DummyMetricsStochasticBalancer();
@BeforeClass
public static void beforeAllTests() throws Exception {
@ -58,7 +59,17 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
testWithCluster(serverMap, null, assertFullyBalanced,
assertFullyBalancedForReplicas);
}
protected void testWithClusterWithIteration(int numNodes, int numRegions, int numRegionsPerServer,
int replication, int numTables, boolean assertFullyBalanced,
boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithClusterWithIteration(serverMap, null, assertFullyBalanced,
assertFullyBalancedForReplicas);
}
protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
@ -102,7 +113,8 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
loadBalancer.setRackManager(rackManager);
// Run the balancer.
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNotNull("Initial cluster balance should produce plans.", plans);

View File

@ -464,10 +464,10 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
BalanceAction action = loadBalancer.nextAction(cluster);
cluster.doAction(action);
loadBalancer.updateCostsWithAction(cluster, action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
BalanceAction undoAction = action.undoAction();
cluster.doAction(undoAction);
loadBalancer.updateCostsWithAction(cluster, undoAction);
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
assertEquals(expectedCost, actualCost, 0);
}

View File

@ -77,10 +77,9 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalan
RULES_FILE = HTU.getDataTestDir(DEFAULT_RULES_FILE_NAME).toString();
conf.set(HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
RULES_FILE);
loadBalancer = new StochasticLoadBalancer();
loadBalancer = new StochasticLoadTestBalancer();
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
loadBalancer.initialize();
loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
}
@Test
@ -302,4 +301,14 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalan
return super.generate(cluster);
}
}
static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
private FairRandomCandidateGenerator fairRandomCandidateGenerator =
new FairRandomCandidateGenerator();
@Override
protected CandidateGenerator getRandomGenerator() {
return fairRandomCandidateGenerator;
}
}
}

View File

@ -40,6 +40,7 @@ public class TestStochasticLoadBalancerLargeCluster extends StochasticBalancerTe
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}