HBASE-26337 Optimization for weighted random generators (#3828)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
1302073dfe
commit
cc2e4aec87
|
@ -101,4 +101,14 @@ abstract class CostFunction {
|
|||
|
||||
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the cost of this cost function to the weight of the candidate generator that is optimized
|
||||
* for this cost function. By default it is the RandomCandiateGenerator for a cost function.
|
||||
* Called once per init or after postAction.
|
||||
* @param weights the weights for every generator.
|
||||
*/
|
||||
public void updateWeight(double[] weights) {
|
||||
weights[StochasticLoadBalancer.GeneratorType.RANDOM.ordinal()] += cost();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,6 +86,14 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
return fnPickers;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return any candidate generator in random
|
||||
*/
|
||||
@Override
|
||||
protected CandidateGenerator getRandomGenerator() {
|
||||
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setMasterServices(MasterServices masterServices) {
|
||||
super.setMasterServices(masterServices);
|
||||
|
|
|
@ -88,4 +88,8 @@ abstract class LocalityBasedCostFunction extends CostFunction {
|
|||
return cluster.getOrComputeWeightedLocality(region, entity, type);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void updateWeight(double[] weights) {
|
||||
weights[StochasticLoadBalancer.GeneratorType.LOCALITY.ordinal()] += cost();
|
||||
}
|
||||
}
|
|
@ -60,4 +60,9 @@ class RegionCountSkewCostFunction extends CostFunction {
|
|||
costs[newServer] = cluster.regionsPerServer[newServer].length;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void updateWeight(double[] weights) {
|
||||
weights[StochasticLoadBalancer.GeneratorType.LOAD.ordinal()] += cost();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,4 +93,9 @@ abstract class RegionReplicaGroupingCostFunction extends CostFunction {
|
|||
});
|
||||
return cost.longValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void updateWeight(double[] weights) {
|
||||
weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,7 +133,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private boolean isBalancerDecisionRecording = false;
|
||||
private boolean isBalancerRejectionRecording = false;
|
||||
|
||||
private List<CandidateGenerator> candidateGenerators;
|
||||
protected List<CandidateGenerator> candidateGenerators;
|
||||
|
||||
public enum GeneratorType {
|
||||
RANDOM, LOAD, LOCALITY, RACK
|
||||
}
|
||||
private double[] weightsOfGenerators;
|
||||
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
|
||||
// To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
|
||||
private float sumMultiplier;
|
||||
|
@ -213,10 +218,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
protected List<CandidateGenerator> createCandidateGenerators() {
|
||||
List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
|
||||
candidateGenerators.add(new RandomCandidateGenerator());
|
||||
candidateGenerators.add(new LoadCandidateGenerator());
|
||||
candidateGenerators.add(localityCandidateGenerator);
|
||||
candidateGenerators.add(new RegionReplicaRackCandidateGenerator());
|
||||
candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
|
||||
candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
|
||||
candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
|
||||
candidateGenerators.add(GeneratorType.RACK.ordinal(),
|
||||
new RegionReplicaRackCandidateGenerator());
|
||||
return candidateGenerators;
|
||||
}
|
||||
|
||||
|
@ -405,8 +411,33 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
BalanceAction nextAction(BalancerClusterState cluster) {
|
||||
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
|
||||
.generate(cluster);
|
||||
return getRandomGenerator().generate(cluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the candidate generator to use based on the cost of cost functions. The chance of
|
||||
* selecting a candidate generator is propotional to the share of cost of all cost functions among
|
||||
* all cost functions that benefit from it.
|
||||
*/
|
||||
protected CandidateGenerator getRandomGenerator() {
|
||||
double sum = 0;
|
||||
for (int i = 0; i < weightsOfGenerators.length; i++) {
|
||||
sum += weightsOfGenerators[i];
|
||||
weightsOfGenerators[i] = sum;
|
||||
}
|
||||
if (sum == 0) {
|
||||
return candidateGenerators.get(0);
|
||||
}
|
||||
for (int i = 0; i < weightsOfGenerators.length; i++) {
|
||||
weightsOfGenerators[i] /= sum;
|
||||
}
|
||||
double rand = ThreadLocalRandom.current().nextDouble();
|
||||
for (int i = 0; i < weightsOfGenerators.length; i++) {
|
||||
if (rand <= weightsOfGenerators[i]) {
|
||||
return candidateGenerators.get(i);
|
||||
}
|
||||
}
|
||||
return candidateGenerators.get(candidateGenerators.size() - 1);
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
|
@ -511,7 +542,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
cluster.doAction(action);
|
||||
updateCostsWithAction(cluster, action);
|
||||
updateCostsAndWeightsWithAction(cluster, action);
|
||||
|
||||
newCost = computeCost(cluster, currentCost);
|
||||
|
||||
|
@ -527,7 +558,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
// TODO: undo by remembering old values
|
||||
BalanceAction undoAction = action.undoAction();
|
||||
cluster.doAction(undoAction);
|
||||
updateCostsWithAction(cluster, undoAction);
|
||||
updateCostsAndWeightsWithAction(cluster, undoAction);
|
||||
}
|
||||
|
||||
if (EnvironmentEdgeManager.currentTime() - startTime >
|
||||
|
@ -728,17 +759,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void initCosts(BalancerClusterState cluster) {
|
||||
// Initialize the weights of generator every time
|
||||
weightsOfGenerators = new double[this.candidateGenerators.size()];
|
||||
for (CostFunction c : costFunctions) {
|
||||
c.prepare(cluster);
|
||||
c.updateWeight(weightsOfGenerators);
|
||||
}
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
// Reset all the weights to 0
|
||||
for (int i = 0; i < weightsOfGenerators.length; i++) {
|
||||
weightsOfGenerators[i] = 0;
|
||||
}
|
||||
for (CostFunction c : costFunctions) {
|
||||
if (c.isNeeded()) {
|
||||
c.postAction(action);
|
||||
c.updateWeight(weightsOfGenerators);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -561,6 +561,15 @@ public class BalancerTestBase {
|
|||
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
|
||||
}
|
||||
|
||||
protected void testWithClusterWithIteration(int numNodes, int numRegions, int numRegionsPerServer,
|
||||
int replication, int numTables, boolean assertFullyBalanced,
|
||||
boolean assertFullyBalancedForReplicas) {
|
||||
Map<ServerName, List<RegionInfo>> serverMap =
|
||||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
testWithClusterWithIteration(serverMap, null, assertFullyBalanced,
|
||||
assertFullyBalancedForReplicas);
|
||||
}
|
||||
|
||||
protected void testWithCluster(Map<ServerName, List<RegionInfo>> serverMap,
|
||||
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
List<ServerAndLoad> list = convertToList(serverMap);
|
||||
|
|
|
@ -401,10 +401,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
|
||||
BalanceAction action = loadBalancer.nextAction(cluster);
|
||||
cluster.doAction(action);
|
||||
loadBalancer.updateCostsWithAction(cluster, action);
|
||||
loadBalancer.updateCostsAndWeightsWithAction(cluster, action);
|
||||
BalanceAction undoAction = action.undoAction();
|
||||
cluster.doAction(undoAction);
|
||||
loadBalancer.updateCostsWithAction(cluster, undoAction);
|
||||
loadBalancer.updateCostsAndWeightsWithAction(cluster, undoAction);
|
||||
final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE);
|
||||
assertEquals(expectedCost, actualCost, 0);
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
|
|||
*/
|
||||
@Test
|
||||
public void testBalanceCluster() throws Exception {
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000); // 3 min
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 2 * 60 * 1000); // 2 min
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
BalancerTestBase.conf.set(
|
||||
HeterogeneousRegionCountCostFunction.HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE,
|
||||
RULES_FILE);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer = new StochasticLoadTestBalancer();
|
||||
MasterServices services = mock(MasterServices.class);
|
||||
when(services.getConfiguration()).thenReturn(conf);
|
||||
BalancerTestBase.loadBalancer.setMasterServices(services);
|
||||
|
@ -307,4 +307,14 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
return super.generate(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
static class StochasticLoadTestBalancer extends StochasticLoadBalancer {
|
||||
private FairRandomCandidateGenerator fairRandomCandidateGenerator =
|
||||
new FairRandomCandidateGenerator();
|
||||
|
||||
@Override
|
||||
protected CandidateGenerator getRandomGenerator() {
|
||||
return fairRandomCandidateGenerator;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ public class TestStochasticLoadBalancerLargeCluster extends BalancerTestBase {
|
|||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
|
||||
testWithClusterWithIteration(numNodes, numRegions, numRegionsPerServer, replication, numTables,
|
||||
true, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue