Revert "HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3067)"
This reverts commit 533c84d330
.
This commit is contained in:
parent
94f4479e8f
commit
b65890da1d
|
@ -163,12 +163,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
|
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
|
||||||
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
|
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
|
||||||
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
|
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
|
||||||
int[] numRegionsPerTable; // tableIndex -> region count
|
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
|
||||||
double[] meanRegionsPerTable; // mean region count per table
|
|
||||||
double regionSkewByTable; // skew on RS per by table
|
|
||||||
double minRegionSkewByTable; // min skew on RS per by table
|
|
||||||
double maxRegionSkewByTable; // max skew on RS per by table
|
|
||||||
|
|
||||||
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
|
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
|
||||||
boolean hasRegionReplicas = false; //whether there is regions with replicas
|
boolean hasRegionReplicas = false; //whether there is regions with replicas
|
||||||
|
|
||||||
|
@ -376,7 +371,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
numTables = tables.size();
|
numTables = tables.size();
|
||||||
numRegionsPerServerPerTable = new int[numServers][numTables];
|
numRegionsPerServerPerTable = new int[numServers][numTables];
|
||||||
numRegionsPerTable = new int[numTables];
|
|
||||||
|
|
||||||
for (int i = 0; i < numServers; i++) {
|
for (int i = 0; i < numServers; i++) {
|
||||||
for (int j = 0; j < numTables; j++) {
|
for (int j = 0; j < numTables; j++) {
|
||||||
|
@ -384,29 +378,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < numTables; i++) {
|
|
||||||
numRegionsPerTable[i] = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i=0; i < regionIndexToServerIndex.length; i++) {
|
for (int i=0; i < regionIndexToServerIndex.length; i++) {
|
||||||
if (regionIndexToServerIndex[i] >= 0) {
|
if (regionIndexToServerIndex[i] >= 0) {
|
||||||
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
||||||
numRegionsPerTable[regionIndexToTableIndex[i]]++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid repeated computation for planning
|
numMaxRegionsPerTable = new int[numTables];
|
||||||
meanRegionsPerTable = new double[numTables];
|
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||||
maxRegionSkewByTable = 0;
|
for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
|
||||||
minRegionSkewByTable = 0;
|
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||||
for (int i = 0; i < numTables; i++) {
|
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||||
meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
|
}
|
||||||
minRegionSkewByTable += Cluster.getMinSkew(numRegionsPerTable[i], numServers);
|
}
|
||||||
maxRegionSkewByTable += Cluster.getMaxSkew(numRegionsPerTable[i], numServers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
computeRegionSkewPerTable();
|
|
||||||
|
|
||||||
for (int i = 0; i < regions.length; i ++) {
|
for (int i = 0; i < regions.length; i ++) {
|
||||||
RegionInfo info = regions[i];
|
RegionInfo info = regions[i];
|
||||||
if (RegionReplicaUtil.isDefaultReplica(info)) {
|
if (RegionReplicaUtil.isDefaultReplica(info)) {
|
||||||
|
@ -531,53 +517,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
return numRegions < minLoad;
|
return numRegions < minLoad;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the min skew of distribution
|
|
||||||
*/
|
|
||||||
public static double getMinSkew(double total, double numServers) {
|
|
||||||
double mean = total / numServers;
|
|
||||||
// It's possible that there aren't enough regions to go around
|
|
||||||
double min;
|
|
||||||
if (numServers > total) {
|
|
||||||
min = ((numServers - total) * mean + (1 - mean) * total) ;
|
|
||||||
} else {
|
|
||||||
// Some will have 1 more than everything else.
|
|
||||||
int numHigh = (int) (total - (Math.floor(mean) * numServers));
|
|
||||||
int numLow = (int) (numServers - numHigh);
|
|
||||||
min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean));
|
|
||||||
}
|
|
||||||
return min;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the max deviation of distribution
|
|
||||||
* Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
|
||||||
* a zero sum cost for this to make sense.
|
|
||||||
*/
|
|
||||||
public static double getMaxSkew(double total, double numServers) {
|
|
||||||
double mean = total / numServers;
|
|
||||||
return (total - mean) + (numServers - 1) * mean;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Scale the value between 0 and 1.
|
|
||||||
*
|
|
||||||
* @param min Min value
|
|
||||||
* @param max The Max value
|
|
||||||
* @param value The value to be scaled.
|
|
||||||
* @return The scaled value.
|
|
||||||
*/
|
|
||||||
public static double scale(double min, double max, double value) {
|
|
||||||
if (max <= min || value <= min) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if ((max - min) == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves and lazily initializes a field storing the locality of
|
* Retrieves and lazily initializes a field storing the locality of
|
||||||
* every region/server combination
|
* every region/server combination
|
||||||
|
@ -635,21 +574,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
return regionLoads[region].getLast().getStorefileSizeMB();
|
return regionLoads[region].getLast().getStorefileSizeMB();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Recompute the region skew during init or plan of moves.
|
|
||||||
*/
|
|
||||||
private void computeRegionSkewPerTable() {
|
|
||||||
// reinitialize for recomputation
|
|
||||||
regionSkewByTable = 0;
|
|
||||||
|
|
||||||
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
|
||||||
for (int tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) {
|
|
||||||
regionSkewByTable += Math.abs(aNumRegionsPerServerPerTable[tableIndex]
|
|
||||||
- meanRegionsPerTable[tableIndex]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Computes and caches the locality for each region/rack combinations,
|
* Computes and caches the locality for each region/rack combinations,
|
||||||
* as well as storing a mapping of region -> server and region -> rack such that server
|
* as well as storing a mapping of region -> server and region -> rack such that server
|
||||||
|
@ -905,20 +829,22 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
int tableIndex = regionIndexToTableIndex[region];
|
int tableIndex = regionIndexToTableIndex[region];
|
||||||
if (oldServer >= 0) {
|
if (oldServer >= 0) {
|
||||||
numRegionsPerServerPerTable[oldServer][tableIndex]--;
|
numRegionsPerServerPerTable[oldServer][tableIndex]--;
|
||||||
// update regionSkewPerTable for the move from old server
|
|
||||||
regionSkewByTable +=
|
|
||||||
Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex]
|
|
||||||
- meanRegionsPerTable[tableIndex])
|
|
||||||
- Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + 1
|
|
||||||
- meanRegionsPerTable[tableIndex]);
|
|
||||||
}
|
}
|
||||||
numRegionsPerServerPerTable[newServer][tableIndex]++;
|
numRegionsPerServerPerTable[newServer][tableIndex]++;
|
||||||
// update regionSkewPerTable for the move to new server
|
|
||||||
regionSkewByTable +=
|
//check whether this caused maxRegionsPerTable in the new Server to be updated
|
||||||
Math.abs(numRegionsPerServerPerTable[newServer][tableIndex]
|
if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||||
- meanRegionsPerTable[tableIndex])
|
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex];
|
||||||
- Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] - 1
|
} else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
|
||||||
- meanRegionsPerTable[tableIndex]);
|
== numMaxRegionsPerTable[tableIndex]) {
|
||||||
|
//recompute maxRegionsPerTable since the previous value was coming from the old server
|
||||||
|
numMaxRegionsPerTable[tableIndex] = 0;
|
||||||
|
for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
|
||||||
|
if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) {
|
||||||
|
numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// update for servers
|
// update for servers
|
||||||
int primary = regionIndexToPrimaryIndex[region];
|
int primary = regionIndexToPrimaryIndex[region];
|
||||||
|
@ -1088,7 +1014,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
.append(Arrays.toString(serverIndicesSortedByRegionCount))
|
.append(Arrays.toString(serverIndicesSortedByRegionCount))
|
||||||
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
|
.append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
|
||||||
|
|
||||||
desc.append(", regionSkewByTable=").append(regionSkewByTable)
|
desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
|
||||||
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
|
.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
|
||||||
.append(", numTables=").append(numTables).append(", numMovedRegions=")
|
.append(", numTables=").append(numTables).append(", numMovedRegions=")
|
||||||
.append(numMovedRegions).append('}');
|
.append(numMovedRegions).append('}');
|
||||||
|
|
|
@ -762,7 +762,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
boolean isNeeded() {
|
boolean isNeeded() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
float getMultiplier() {
|
float getMultiplier() {
|
||||||
return multiplier;
|
return multiplier;
|
||||||
}
|
}
|
||||||
|
@ -771,39 +770,35 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
this.multiplier = m;
|
this.multiplier = m;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Called once per LB invocation to give the cost function
|
||||||
* Called once per LB invocation to give the cost function
|
|
||||||
* to initialize it's state, and perform any costly calculation.
|
* to initialize it's state, and perform any costly calculation.
|
||||||
*/
|
*/
|
||||||
void init(Cluster cluster) {
|
void init(Cluster cluster) {
|
||||||
this.cluster = cluster;
|
this.cluster = cluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Called once per cluster Action to give the cost function
|
||||||
* Called once per cluster Action to give the cost function
|
|
||||||
* an opportunity to update it's state. postAction() is always
|
* an opportunity to update it's state. postAction() is always
|
||||||
* called at least once before cost() is called with the cluster
|
* called at least once before cost() is called with the cluster
|
||||||
* that this action is performed on.
|
* that this action is performed on. */
|
||||||
*/
|
|
||||||
void postAction(Action action) {
|
void postAction(Action action) {
|
||||||
switch (action.type) {
|
switch (action.type) {
|
||||||
case NULL:
|
case NULL: break;
|
||||||
break;
|
case ASSIGN_REGION:
|
||||||
case ASSIGN_REGION:
|
AssignRegionAction ar = (AssignRegionAction) action;
|
||||||
AssignRegionAction ar = (AssignRegionAction) action;
|
regionMoved(ar.region, -1, ar.server);
|
||||||
regionMoved(ar.region, -1, ar.server);
|
break;
|
||||||
break;
|
case MOVE_REGION:
|
||||||
case MOVE_REGION:
|
MoveRegionAction mra = (MoveRegionAction) action;
|
||||||
MoveRegionAction mra = (MoveRegionAction) action;
|
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
||||||
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
break;
|
||||||
break;
|
case SWAP_REGIONS:
|
||||||
case SWAP_REGIONS:
|
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
regionMoved(a.fromRegion, a.fromServer, a.toServer);
|
||||||
regionMoved(a.fromRegion, a.fromServer, a.toServer);
|
regionMoved(a.toRegion, a.toServer, a.fromServer);
|
||||||
regionMoved(a.toRegion, a.toServer, a.fromServer);
|
break;
|
||||||
break;
|
default:
|
||||||
default:
|
throw new RuntimeException("Uknown action:" + action.type);
|
||||||
throw new RuntimeException("Uknown action:" + action.type);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,25 +822,59 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
double total = getSum(stats);
|
double total = getSum(stats);
|
||||||
|
|
||||||
double count = stats.length;
|
double count = stats.length;
|
||||||
double mean = total / count;
|
double mean = total/count;
|
||||||
|
|
||||||
for (int i = 0; i < stats.length; i++) {
|
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||||
|
// a zero sum cost for this to make sense.
|
||||||
|
double max = ((count - 1) * mean) + (total - mean);
|
||||||
|
|
||||||
|
// It's possible that there aren't enough regions to go around
|
||||||
|
double min;
|
||||||
|
if (count > total) {
|
||||||
|
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||||
|
} else {
|
||||||
|
// Some will have 1 more than everything else.
|
||||||
|
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||||
|
int numLow = (int) (count - numHigh);
|
||||||
|
|
||||||
|
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||||
|
|
||||||
|
}
|
||||||
|
min = Math.max(0, min);
|
||||||
|
for (int i=0; i<stats.length; i++) {
|
||||||
double n = stats[i];
|
double n = stats[i];
|
||||||
double diff = Math.abs(mean - n);
|
double diff = Math.abs(mean - n);
|
||||||
totalCost += diff;
|
totalCost += diff;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Cluster
|
double scaled = scale(min, max, totalCost);
|
||||||
.scale(Cluster.getMinSkew(total, count), Cluster.getMaxSkew(total, count), totalCost);
|
return scaled;
|
||||||
}
|
}
|
||||||
|
|
||||||
private double getSum(double[] stats) {
|
private double getSum(double[] stats) {
|
||||||
double total = 0;
|
double total = 0;
|
||||||
for (double s : stats) {
|
for(double s:stats) {
|
||||||
total += s;
|
total += s;
|
||||||
}
|
}
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scale the value between 0 and 1.
|
||||||
|
*
|
||||||
|
* @param min Min value
|
||||||
|
* @param max The Max value
|
||||||
|
* @param value The value to be scaled.
|
||||||
|
* @return The scaled value.
|
||||||
|
*/
|
||||||
|
protected double scale(double min, double max, double value) {
|
||||||
|
if (max <= min || value <= min) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if ((max - min) == 0) return 0;
|
||||||
|
|
||||||
|
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -898,7 +927,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
return 1000000; // return a number much greater than any of the other cost
|
return 1000000; // return a number much greater than any of the other cost
|
||||||
}
|
}
|
||||||
|
|
||||||
return Cluster.scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
|
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1006,7 +1035,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected double cost() {
|
protected double cost() {
|
||||||
return Cluster.scale(cluster.minRegionSkewByTable, cluster.maxRegionSkewByTable, cluster.regionSkewByTable);
|
double max = cluster.numRegions;
|
||||||
|
double min = ((double) cluster.numRegions) / cluster.numServers;
|
||||||
|
double value = 0;
|
||||||
|
|
||||||
|
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
|
||||||
|
value += cluster.numMaxRegionsPerTable[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
return scale(min, max, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1329,7 +1366,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
for (int i = 0 ; i < costsPerGroup.length; i++) {
|
for (int i = 0 ; i < costsPerGroup.length; i++) {
|
||||||
totalCost += costsPerGroup[i];
|
totalCost += costsPerGroup[i];
|
||||||
}
|
}
|
||||||
return Cluster.scale(0, maxCost, totalCost);
|
return scale(0, maxCost, totalCost);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -390,8 +390,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
||||||
|
|
||||||
// now move region1 from servers[0] to servers[2]
|
// now move region1 from servers[0] to servers[2]
|
||||||
cluster.doAction(new MoveRegionAction(0, 0, 2));
|
cluster.doAction(new MoveRegionAction(0, 0, 2));
|
||||||
// check that the regionSkewByTable for "table" has increased to 2
|
// check that the numMaxRegionsPerTable for "table" has increased to 2
|
||||||
assertEquals(2, cluster.regionSkewByTable, 0.01);
|
assertEquals(2, cluster.numMaxRegionsPerTable[0]);
|
||||||
// now repeat check whether moving region1 from servers[1] to servers[2]
|
// now repeat check whether moving region1 from servers[1] to servers[2]
|
||||||
// would lower availability
|
// would lower availability
|
||||||
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
|
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
|
||||||
|
|
Loading…
Reference in New Issue