HBASE-25739 TableSkewCostFunction need to use aggregated deviation - … (#3481)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
    Signed-off-by: stack <stack@duboce.net>
    Reviewed-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
clarax 2021-07-16 10:53:41 -07:00 committed by GitHub
parent 5ee1447b55
commit dd2ae3605d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 109 additions and 78 deletions

View File

@ -165,8 +165,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int[] regionIndexToServerIndex; //regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
int[] numRegionsPerTable; // tableIndex -> region count
double[] meanRegionsPerTable; // mean region count per table
double[] regionSkewByTable; // skew on RS per by table
double[] minRegionSkewByTable; // min skew on RS per by table
double[] maxRegionSkewByTable; // max skew on RS per by table
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas
@ -373,7 +377,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
numTables = tables.size();
LOG.debug("Number of tables={}", numTables);
numRegionsPerServerPerTable = new int[numServers][numTables];
numRegionsPerTable = new int[numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
@ -384,15 +390,26 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
numRegionsPerTable[regionIndexToTableIndex[i]]++;
}
}
numMaxRegionsPerTable = new int[numTables];
// Avoid repeated computation for planning
meanRegionsPerTable = new double[numTables];
regionSkewByTable = new double[numTables];
maxRegionSkewByTable = new double[numTables];
minRegionSkewByTable = new double[numTables];
for (int i = 0; i < numTables; i++) {
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
}
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
regionSkewByTable[tableIdx] +=
Math.abs(aNumRegionsPerServerPerTable[tableIdx] - meanRegionsPerTable[tableIdx]);
}
}
@ -832,22 +849,13 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int tableIndex = regionIndexToTableIndex[region];
if (oldServer >= 0) {
numRegionsPerServerPerTable[oldServer][tableIndex]--;
// update regionSkewPerTable for the move from old server
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
}
numRegionsPerServerPerTable[newServer][tableIndex]++;
//check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
numMaxRegionsPerTable[tableIndex] = 0;
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
}
}
}
// update regionSkewPerTable for the move to new server
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);
// update for servers
int primary = regionIndexToPrimaryIndex[region];
@ -1017,12 +1025,20 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
.append(Arrays.toString(serverIndicesSortedByRegionCount))
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
.append(", numTables=").append(numTables).append(", numMovedRegions=")
.append(numMovedRegions).append('}');
return desc.toString();
}
private double getSkewChangeFor(int serverIndex, int tableIndex, double regionCountChange) {
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
meanRegionsPerTable[tableIndex]);
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
regionCountChange - meanRegionsPerTable[tableIndex]);
return curSkew - oldSkew;
}
}
// slop for regions

View File

@ -72,31 +72,14 @@ final class DoubleArrayCost {
double count = stats.length;
double mean = total / count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);
// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = StochasticLoadBalancer.scale(min, max, totalCost);
return scaled;
return StochasticLoadBalancer.scale(getMinSkew(total, count),
getMaxSkew(total, count), totalCost);
}
private static double getSum(double[] stats) {
@ -106,4 +89,34 @@ final class DoubleArrayCost {
}
return total;
}
}
/**
* Return the min skew of distribution
* @param total is total number of regions
*/
public static double getMinSkew(double total, double numServers) {
double mean = total / numServers;
// It's possible that there aren't enough regions to go around
double min;
if (numServers > total) {
min = ((numServers - total) * mean + (1 - mean) * total) ;
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * numServers));
int numLow = (int) (numServers - numHigh);
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
}
return min;
}
/**
* Return the max deviation of distribution
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
* a zero sum cost for this to make sense.
* @param total is total number of regions
*/
public static double getMaxSkew(double total, double numServers) {
double mean = total / numServers;
return (total - mean) + (numServers - 1) * mean;
}
}

View File

@ -133,6 +133,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
public static final double COST_EPSILON = 0.0001;
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
@ -142,7 +143,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private int stepsPerRegion = 800;
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private float minCostNeedBalance = 0.025f;
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;
@ -236,9 +237,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
}
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
LOG.info(
"Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps +
", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable
+ ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) + " etc.");
}
private void loadCustomCostFunctions(Configuration conf) {
@ -801,7 +804,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* Base class of StochasticLoadBalancer's Cost Functions.
*/
public abstract static class CostFunction {
private float multiplier = 0;
protected Cluster cluster;
@ -858,24 +860,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected abstract double cost();
}
/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) {
return 0;
}
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
/**
* Given the starting state of the regions and a potential ending state
* compute cost based upon the number of regions that have moved.
@ -1065,15 +1049,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
double cost = 0;
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
cost += scale(cluster.minRegionSkewByTable[tableIdx],
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
}
return scale(min, max, value);
return cost;
}
}
@ -1520,4 +1501,23 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
public static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}
/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
static double scale(double min, double max, double value) {
if (max <= min || value <= min
|| Math.abs(max - min) <= COST_EPSILON || Math.abs(value - min) <= COST_EPSILON) {
return 0;
}
if (max <= min || Math.abs(max - min) <= COST_EPSILON) {
return 0;
}
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}

View File

@ -72,7 +72,6 @@ public class BalancerTestBase {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setFloat("hbase.regions.slop", 0.0f);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000);
loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf);
}

View File

@ -389,8 +389,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// check that the numMaxRegionsPerTable for "table" has increased to 2
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
// check that the regionSkewByTable for "table" has increased to 2
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));

View File

@ -51,9 +51,9 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000); // 300 sec
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);

View File

@ -64,7 +64,6 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
BalancerTestBase.conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
BalancerTestBase.conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
BalancerTestBase.conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
HeterogeneousRegionCountCostFunction.class.getName());
// Need to ensure test dir has been created.

View File

@ -38,6 +38,9 @@ public class TestStochasticLoadBalancerLargeCluster extends BalancerTestBase {
int numRegionsPerServer = 80; // all servers except one
int numTables = 100;
int replication = 1;
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
loadBalancer.onConfigurationChange(conf);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
}

View File

@ -38,6 +38,7 @@ public class TestStochasticLoadBalancerRegionReplicaLargeCluster extends Balance
int numRegionsPerServer = 19; // all servers except one
int numTables = 100;
int replication = 3;
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 20000000L);
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
}