HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3415)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: stack <stack@duboce.net> Reviewed-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
9f21eec777
commit
68aaf1ff02
|
@ -81,7 +81,11 @@ class BalancerClusterState {
|
|||
int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
|
||||
int[] regionIndexToTableIndex; // regionIndex -> tableIndex
|
||||
int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions
|
||||
int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
|
||||
int[] numRegionsPerTable; // tableIndex -> region count
|
||||
double[] meanRegionsPerTable; // mean region count per table
|
||||
double[] regionSkewByTable; // skew on RS per by table
|
||||
double[] minRegionSkewByTable; // min skew on RS per by table
|
||||
double[] maxRegionSkewByTable; // max skew on RS per by table
|
||||
int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
|
||||
boolean hasRegionReplicas = false; // whether there is regions with replicas
|
||||
|
||||
|
@ -290,7 +294,9 @@ class BalancerClusterState {
|
|||
}
|
||||
|
||||
numTables = tables.size();
|
||||
LOG.debug("Number of tables={}", numTables);
|
||||
numRegionsPerServerPerTable = new int[numServers][numTables];
|
||||
numRegionsPerTable = new int[numTables];
|
||||
|
||||
for (int i = 0; i < numServers; i++) {
|
||||
for (int j = 0; j < numTables; j++) {
|
||||
|
@ -301,15 +307,26 @@ class BalancerClusterState {
|
|||
for (int i = 0; i < regionIndexToServerIndex.length; i++) {
|
||||
if (regionIndexToServerIndex[i] >= 0) {
|
||||
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
||||
numRegionsPerTable[regionIndexToTableIndex[i]]++;
|
||||
}
|
||||
}
|
||||
|
||||
numMaxRegionsPerTable = new int[numTables];
|
||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
|
||||
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||
// Avoid repeated computation for planning
|
||||
meanRegionsPerTable = new double[numTables];
|
||||
regionSkewByTable = new double[numTables];
|
||||
maxRegionSkewByTable = new double[numTables];
|
||||
minRegionSkewByTable = new double[numTables];
|
||||
|
||||
for (int i = 0; i < numTables; i++) {
|
||||
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
|
||||
minRegionSkewByTable[i] += DoubleArrayCost.getMinSkew(numRegionsPerTable[i], numServers);
|
||||
maxRegionSkewByTable[i] += DoubleArrayCost.getMaxSkew(numRegionsPerTable[i], numServers);
|
||||
}
|
||||
|
||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||
for (int tableIdx = 0; tableIdx < aNumRegionsPerServerPerTable.length; tableIdx++) {
|
||||
regionSkewByTable[tableIdx] += Math.abs(aNumRegionsPerServerPerTable[tableIdx] -
|
||||
meanRegionsPerTable[tableIdx]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -671,22 +688,13 @@ class BalancerClusterState {
|
|||
int tableIndex = regionIndexToTableIndex[region];
|
||||
if (oldServer >= 0) {
|
||||
numRegionsPerServerPerTable[oldServer][tableIndex]--;
|
||||
// update regionSkewPerTable for the move from old server
|
||||
regionSkewByTable[tableIndex] += getSkewChangeFor(oldServer, tableIndex, -1);
|
||||
}
|
||||
numRegionsPerServerPerTable[newServer][tableIndex]++;
|
||||
|
||||
// check whether this caused maxRegionsPerTable in the new Server to be updated
|
||||
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
|
||||
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex]
|
||||
+ 1) == numMaxRegionsPerTable[tableIndex]) {
|
||||
// recompute maxRegionsPerTable since the previous value was coming from the old server
|
||||
numMaxRegionsPerTable[tableIndex] = 0;
|
||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||
}
|
||||
}
|
||||
}
|
||||
// update regionSkewPerTable for the move to new server
|
||||
regionSkewByTable[tableIndex] += getSkewChangeFor(newServer, tableIndex, 1);
|
||||
|
||||
// update for servers
|
||||
int primary = regionIndexToPrimaryIndex[region];
|
||||
|
@ -856,10 +864,18 @@ class BalancerClusterState {
|
|||
.append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
|
||||
.append(Arrays.deepToString(regionsPerServer));
|
||||
|
||||
desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
|
||||
desc.append(", regionSkewByTable=").append(Arrays.toString(regionSkewByTable))
|
||||
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
|
||||
.append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
|
||||
.append('}');
|
||||
return desc.toString();
|
||||
}
|
||||
|
||||
private double getSkewChangeFor(int serverIndex, int tableIndex, int regionCountChange) {
|
||||
double curSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
|
||||
meanRegionsPerTable[tableIndex]);
|
||||
double oldSkew = Math.abs(numRegionsPerServerPerTable[serverIndex][tableIndex] -
|
||||
regionCountChange - meanRegionsPerTable[tableIndex]);
|
||||
return curSkew - oldSkew;
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
abstract class CostFunction {
|
||||
|
||||
public static final double COST_EPSILON = 0.0001;
|
||||
|
||||
private float multiplier = 0;
|
||||
|
||||
protected BalancerClusterState cluster;
|
||||
|
@ -89,10 +91,11 @@ abstract class CostFunction {
|
|||
* @return The scaled value.
|
||||
*/
|
||||
protected static double scale(double min, double max, double value) {
|
||||
if (max <= min || value <= min) {
|
||||
if (max <= min || value <= min
|
||||
|| Math.abs(max - min) <= COST_EPSILON || Math.abs(value - min) <= COST_EPSILON) {
|
||||
return 0;
|
||||
}
|
||||
if ((max - min) == 0) {
|
||||
if (max <= min || Math.abs(max - min) <= COST_EPSILON) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,31 +72,13 @@ final class DoubleArrayCost {
|
|||
double count = stats.length;
|
||||
double mean = total / count;
|
||||
|
||||
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
// a zero sum cost for this to make sense.
|
||||
double max = ((count - 1) * mean) + (total - mean);
|
||||
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (count > total) {
|
||||
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||
int numLow = (int) (count - numHigh);
|
||||
|
||||
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||
|
||||
}
|
||||
min = Math.max(0, min);
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
double n = stats[i];
|
||||
double diff = Math.abs(mean - n);
|
||||
totalCost += diff;
|
||||
}
|
||||
|
||||
double scaled = CostFunction.scale(min, max, totalCost);
|
||||
return scaled;
|
||||
return CostFunction.scale(getMinSkew(total, count),
|
||||
getMaxSkew(total, count), totalCost);
|
||||
}
|
||||
|
||||
private static double getSum(double[] stats) {
|
||||
|
@ -106,4 +88,33 @@ final class DoubleArrayCost {
|
|||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the min skew of distribution
|
||||
* @param total is total number of regions
|
||||
*/
|
||||
public static double getMinSkew(double total, double numServers) {
|
||||
double mean = total / numServers;
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (numServers > total) {
|
||||
min = ((numServers - total) * mean + (1 - mean) * total) ;
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * numServers));
|
||||
int numLow = (int) (numServers - numHigh);
|
||||
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
|
||||
}
|
||||
return min;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the max deviation of distribution
|
||||
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
* a zero sum cost for this to make sense.
|
||||
*/
|
||||
public static double getMaxSkew(double total, double numServers) {
|
||||
double mean = total / numServers;
|
||||
return (total - mean) + (numServers - 1) * mean;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private int stepsPerRegion = 800;
|
||||
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
|
||||
private int numRegionLoadsToRemember = 15;
|
||||
private float minCostNeedBalance = 0.05f;
|
||||
private float minCostNeedBalance = 0.025f;
|
||||
|
||||
private List<CandidateGenerator> candidateGenerators;
|
||||
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
|
||||
|
@ -240,7 +240,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
curFunctionCosts = new double[costFunctions.size()];
|
||||
tempFunctionCosts = new double[costFunctions.size()];
|
||||
|
||||
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
|
||||
LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps,
|
||||
", stepsPerRegion=" + stepsPerRegion +
|
||||
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
|
||||
Arrays.toString(getCostFunctionNames()) + " etc.");
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
class TableSkewCostFunction extends CostFunction {
|
||||
|
||||
private static final String TABLE_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.tableSkewCost";
|
||||
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
||||
|
@ -37,14 +36,11 @@ class TableSkewCostFunction extends CostFunction {
|
|||
|
||||
@Override
|
||||
protected double cost() {
|
||||
double max = cluster.numRegions;
|
||||
double min = ((double) cluster.numRegions) / cluster.numServers;
|
||||
double value = 0;
|
||||
|
||||
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
|
||||
value += cluster.numMaxRegionsPerTable[i];
|
||||
double cost = 0;
|
||||
for (int tableIdx = 0; tableIdx < cluster.numTables; tableIdx++) {
|
||||
cost += scale(cluster.minRegionSkewByTable[tableIdx],
|
||||
cluster.maxRegionSkewByTable[tableIdx], cluster.regionSkewByTable[tableIdx]);
|
||||
}
|
||||
|
||||
return scale(min, max, value);
|
||||
return cost;
|
||||
}
|
||||
}
|
|
@ -46,7 +46,7 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
|
|||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
|
||||
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_RUNNING_TIME_KEY, 3 * 60 * 1000L);
|
||||
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||
loadBalancer.initialize();
|
||||
|
|
|
@ -355,8 +355,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
|
||||
// now move region1 from servers[0] to servers[2]
|
||||
cluster.doAction(new MoveRegionAction(0, 0, 2));
|
||||
// check that the numMaxRegionsPerTable for "table" has increased to 2
|
||||
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
|
||||
// check that the regionSkewByTable for "table" has increased to 2
|
||||
assertEquals(2, cluster.regionSkewByTable[0], 0.01);
|
||||
// now repeat check whether moving region1 from servers[1] to servers[2]
|
||||
// would lower availability
|
||||
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
|
||||
|
|
|
@ -51,15 +51,12 @@ public class TestStochasticLoadBalancerBalanceCluster extends StochasticBalancer
|
|||
*/
|
||||
@Test
|
||||
public void testBalanceCluster() throws Exception {
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(servers);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
|
|
|
@ -70,7 +70,6 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalan
|
|||
conf.setFloat("hbase.master.balancer.stochastic.regionCountCost", 0);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.primaryRegionCountCost", 0);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 0);
|
||||
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
|
||||
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
|
||||
HeterogeneousRegionCountCostFunction.class.getName());
|
||||
// Need to ensure test dir has been created.
|
||||
|
|
|
@ -38,6 +38,9 @@ public class TestStochasticLoadBalancerLargeCluster extends StochasticBalancerTe
|
|||
int numRegionsPerServer = 80; // all servers except one
|
||||
int numTables = 100;
|
||||
int replication = 1;
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 6 * 60 * 1000);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue