HBASE-26337 Optimization for weighted random generators (#3829)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
clarax 2021-11-10 17:57:25 -08:00 committed by GitHub
parent 230ce3e6b5
commit 455f7bfd7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 109 additions and 18 deletions

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
@ -90,6 +89,14 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
setCandidateGenerators(fnPickers);
}
/**
* @return any candidate generator in random
*/
@Override
protected CandidateGenerator getRandomGenerator() {
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()));
}
@Override
public synchronized void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);

View File

@ -147,7 +147,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;
private List<CandidateGenerator> candidateGenerators;
protected List<CandidateGenerator> candidateGenerators;
public enum GeneratorType {
RANDOM, LOAD, LOCALITY, RACK
}
private double[] weightsOfGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
private float sumMultiplier;
// to save and report costs to JMX
@ -199,10 +204,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
if (this.candidateGenerators == null) {
candidateGenerators = Lists.newArrayList();
candidateGenerators.add(new RandomCandidateGenerator());
candidateGenerators.add(new LoadCandidateGenerator());
candidateGenerators.add(localityCandidateGenerator);
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
candidateGenerators.add(GeneratorType.RACK.ordinal(),
new RegionReplicaRackCandidateGenerator());
}
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
@ -381,8 +387,33 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@InterfaceAudience.Private
Cluster.Action nextAction(Cluster cluster) {
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
.generate(cluster);
return getRandomGenerator().generate(cluster);
}
/**
* Select the candidate generator to use based on the cost of cost functions. The chance of
* selecting a candidate generator is propotional to the share of cost of all cost functions among
* all cost functions that benefit from it.
*/
protected CandidateGenerator getRandomGenerator() {
double sum = 0;
for (int i = 0; i < weightsOfGenerators.length; i++) {
sum += weightsOfGenerators[i];
weightsOfGenerators[i] = sum;
}
if (sum == 0) {
return candidateGenerators.get(0);
}
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] /= sum;
}
double rand = RANDOM.nextDouble();
for (int i = 0; i < weightsOfGenerators.length; i++) {
if (rand <= weightsOfGenerators[i]) {
return candidateGenerators.get(i);
}
}
return candidateGenerators.get(candidateGenerators.size() - 1);
}
/**
@ -477,7 +508,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
cluster.doAction(action);
updateCostsWithAction(cluster, action);
updateCostsAndWeightsWithAction(cluster, action);
newCost = computeCost(cluster, currentCost);
@ -493,7 +524,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// TODO: undo by remembering old values
Action undoAction = action.undoAction();
cluster.doAction(undoAction);
updateCostsWithAction(cluster, undoAction);
updateCostsAndWeightsWithAction(cluster, undoAction);
}
if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
@ -695,17 +726,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(Cluster cluster) {
for (CostFunction c:costFunctions) {
// Initialize the weights of generator every time
weightsOfGenerators = new double[this.candidateGenerators.size()];
for (CostFunction c : costFunctions) {
c.init(cluster);
c.updateWeight(weightsOfGenerators);
}
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsWithAction(Cluster cluster, Action action) {
void updateCostsAndWeightsWithAction(Cluster cluster, Action action) {
// Reset all the weights to 0
for (int i = 0; i < weightsOfGenerators.length; i++) {
weightsOfGenerators[i] = 0;
}
for (CostFunction c : costFunctions) {
if (c.isNeeded()) {
c.postAction(action);
c.updateWeight(weightsOfGenerators);
}
}
}
@ -869,6 +908,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
protected abstract double cost();
/**
* Add the cost of this cost function to the weight of the candidate generator that is optimized
* for this cost function. By default it is the RandomCandiateGenerator for a cost function.
* Called once per init or after postAction.
* @param weights the weights for every generator.
*/
public void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
}
}
/**
@ -973,6 +1022,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
}
}
/**
@ -1150,6 +1204,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return cluster.getOrComputeWeightedLocality(region, entity, type);
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
}
}
static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
@ -1435,6 +1493,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
}
}
@Override
public final void updateWeight(double[] weights) {
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
}
}
/**

View File

@ -552,6 +552,15 @@ public class BalancerTestBase {
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
}
protected void testWithClusterWithIteration(int numNodes, int numRegions, int numRegionsPerServer,
int replication, int numTables, boolean assertFullyBalanced,
boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<RegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithClusterWithIteration(serverMap, null, assertFullyBalanced,
assertFullyBalancedForReplicas);
}
protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
List<ServerAndLoad> list = convertToList(serverMap);

View File

@ -310,10 +310,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
Cluster.Action action = loadBalancer.nextAction(cluster);
cluster.doAction(action);
loadBalancer.updateCostsWithAction(cluster, action);
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
Cluster.Action undoAction = action.undoAction();
cluster.doAction(undoAction);
loadBalancer.updateCostsWithAction(cluster, undoAction);
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction);
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
assertEquals(expectedCost, actualCost, 0);
}

View File

@ -51,7 +51,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000); // 300 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 2 * 60 * 1000); // 300 sec
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {

View File

@ -73,7 +73,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
BalancerTestBase.conf.set(
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
RULES_FILE);
BalancerTestBase.loadBalancer = new StochasticLoadBalancer();
BalancerTestBase.loadBalancer = new StochasticLoadTestBalancer();
BalancerTestBase.loadBalancer.setConf(BalancerTestBase.conf);
BalancerTestBase.loadBalancer.getCandidateGenerators().add(new FairRandomCandidateGenerator());
}
@ -301,4 +301,14 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
return super.generate(cluster);
}
}
static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
private FairRandomCandidateGenerator fairRandomCandidateGenerator =
new FairRandomCandidateGenerator();
@Override
protected CandidateGenerator getRandomGenerator() {
return fairRandomCandidateGenerator;
}
}
}

View File

@ -41,6 +41,7 @@ public class TestStochasticLoadBalancerLargeCluster extends BalancerTestBase {
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}

View File

@ -39,6 +39,7 @@ public class TestStochasticLoadBalancerRegionReplicaLargeCluster extends Balance
int numTables = 100;
int replication = 3;
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
true, true);
}
}