HBASE-17707 New More Accurate Table Skew cost function/generator - revert due to missing JIRA number
This commit is contained in:
parent
0d3e986f7e
commit
dfc6cf3076
|
@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.master.RackManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||||
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -141,7 +140,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
|
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
|
||||||
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
|
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
|
||||||
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
|
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
|
||||||
int[] numRegionsPerTable; // tableIndex -> number of regions that table has
|
|
||||||
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
|
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
|
||||||
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
|
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
|
||||||
boolean hasRegionReplicas = false; //whether there is regions with replicas
|
boolean hasRegionReplicas = false; //whether there is regions with replicas
|
||||||
|
@ -332,7 +330,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
numTables = tables.size();
|
numTables = tables.size();
|
||||||
numRegionsPerServerPerTable = new int[numServers][numTables];
|
numRegionsPerServerPerTable = new int[numServers][numTables];
|
||||||
numRegionsPerTable = new int[numTables];
|
|
||||||
|
|
||||||
for (int i = 0; i < numServers; i++) {
|
for (int i = 0; i < numServers; i++) {
|
||||||
for (int j = 0; j < numTables; j++) {
|
for (int j = 0; j < numTables; j++) {
|
||||||
|
@ -342,7 +339,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
|
|
||||||
for (int i=0; i < regionIndexToServerIndex.length; i++) {
|
for (int i=0; i < regionIndexToServerIndex.length; i++) {
|
||||||
if (regionIndexToServerIndex[i] >= 0) {
|
if (regionIndexToServerIndex[i] >= 0) {
|
||||||
numRegionsPerTable[regionIndexToTableIndex[i]]++;
|
|
||||||
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -474,76 +470,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the minimum number of regions of a table T each server would store if T were
|
|
||||||
* perfectly distributed (i.e. round-robin-ed) across the cluster
|
|
||||||
*/
|
|
||||||
public int minRegionsIfEvenlyDistributed(int table) {
|
|
||||||
return numRegionsPerTable[table] / numServers;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the maximum number of regions of a table T each server would store if T were
|
|
||||||
* perfectly distributed (i.e. round-robin-ed) across the cluster
|
|
||||||
*/
|
|
||||||
public int maxRegionsIfEvenlyDistributed(int table) {
|
|
||||||
int min = minRegionsIfEvenlyDistributed(table);
|
|
||||||
return numRegionsPerTable[table] % numServers == 0 ? min : min + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of servers that should hold maxRegionsIfEvenlyDistributed for a given
|
|
||||||
* table. A special case here is if maxRegionsIfEvenlyDistributed == minRegionsIfEvenlyDistributed,
|
|
||||||
* in which case all servers should hold the max
|
|
||||||
*/
|
|
||||||
public int numServersWithMaxRegionsIfEvenlyDistributed(int table) {
|
|
||||||
int numWithMax = numRegionsPerTable[table] % numServers;
|
|
||||||
if (numWithMax == 0) {
|
|
||||||
return numServers;
|
|
||||||
} else {
|
|
||||||
return numWithMax;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true iff at least one server in the cluster stores either more than the min/max load
|
|
||||||
* per server when all regions are evenly distributed across the cluster
|
|
||||||
*/
|
|
||||||
public boolean hasUnevenRegionDistribution() {
|
|
||||||
int minLoad = numRegions / numServers;
|
|
||||||
int maxLoad = numRegions % numServers == 0 ? minLoad : minLoad + 1;
|
|
||||||
for (int server = 0; server < numServers; server++) {
|
|
||||||
int numRegions = getNumRegions(server);
|
|
||||||
if (numRegions > maxLoad || numRegions < minLoad) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a pair where the first server is that with the least number of regions across the
|
|
||||||
* cluster and the second server is that with the most number of regions across the cluster
|
|
||||||
*/
|
|
||||||
public Pair<Integer, Integer> findLeastAndMostLoadedServers() {
|
|
||||||
int minServer = 0;
|
|
||||||
int maxServer = 0;
|
|
||||||
int minLoad = getNumRegions(minServer);
|
|
||||||
int maxLoad = minLoad;
|
|
||||||
for (int server = 1; server < numServers; server++) {
|
|
||||||
int numRegions = getNumRegions(server);
|
|
||||||
if (numRegions < minLoad) {
|
|
||||||
minServer = server;
|
|
||||||
minLoad = numRegions;
|
|
||||||
}
|
|
||||||
if (numRegions > maxLoad) {
|
|
||||||
maxServer = server;
|
|
||||||
maxLoad = numRegions;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Pair.newPair(minServer, maxServer);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** An action to move or swap a region */
|
/** An action to move or swap a region */
|
||||||
public static class Action {
|
public static class Action {
|
||||||
public static enum Type {
|
public static enum Type {
|
||||||
|
|
|
@ -18,14 +18,10 @@
|
||||||
package org.apache.hadoop.hbase.master.balancer;
|
package org.apache.hadoop.hbase.master.balancer;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -34,6 +30,7 @@ import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.RegionLoad;
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
import org.apache.hadoop.hbase.ServerLoad;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||||
|
@ -53,10 +49,6 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegi
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
|
|
||||||
import com.google.common.base.Optional;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
|
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
|
||||||
|
@ -927,225 +919,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Generates candidate actions to minimize the TableSkew cost function.
|
|
||||||
*
|
|
||||||
* For efficiency reasons, the cluster must be passed in when this generator is
|
|
||||||
* constructed. Every move generated is applied to the cost function
|
|
||||||
* (i.e. it is assumed that every action we generate is applied to the cluster).
|
|
||||||
* This means we can adjust our cost incrementally for the cluster, rather than
|
|
||||||
* recomputing at each iteration.
|
|
||||||
*/
|
|
||||||
static class TableSkewCandidateGenerator extends CandidateGenerator {
|
|
||||||
|
|
||||||
// Mapping of table -> true iff too many servers in the cluster store at least
|
|
||||||
// cluster.maxRegionsIfEvenlydistributed(table)
|
|
||||||
boolean[] tablesWithEnoughServersWithMaxRegions = null;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
Action generate(Cluster cluster) {
|
|
||||||
if (tablesWithEnoughServersWithMaxRegions == null || tablesWithEnoughServersWithMaxRegions.length != cluster.numTables) {
|
|
||||||
tablesWithEnoughServersWithMaxRegions = new boolean[cluster.numTables];
|
|
||||||
}
|
|
||||||
if (cluster.hasUnevenRegionDistribution()) {
|
|
||||||
Pair<Integer, Integer> leastAndMostLoadedServers = cluster.findLeastAndMostLoadedServers();
|
|
||||||
return moveFromTableWithEnoughRegions(cluster, leastAndMostLoadedServers.getSecond(), leastAndMostLoadedServers.getFirst());
|
|
||||||
} else {
|
|
||||||
Optional<TableAndServer> tableServer = findSkewedTableServer(cluster);
|
|
||||||
if (!tableServer.isPresent()) {
|
|
||||||
return Cluster.NullAction;
|
|
||||||
}
|
|
||||||
return findBestActionForTableServer(cluster, tableServer.get());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a move fromServer -> toServer such that after the move fromServer will still have at least
|
|
||||||
* the min # regions in terms of table skew calculation
|
|
||||||
*/
|
|
||||||
private Action moveFromTableWithEnoughRegions(Cluster cluster, int fromServer, int toServer) {
|
|
||||||
for (int table : getShuffledRangeOfInts(0, cluster.numTables)) {
|
|
||||||
int min = cluster.minRegionsIfEvenlyDistributed(table);
|
|
||||||
if (cluster.numRegionsPerServerPerTable[fromServer][table] > min) {
|
|
||||||
return getAction(fromServer, pickRandomRegionFromTableOnServer(cluster, fromServer, table), toServer, -1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Cluster.NullAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Picks a random subset of tables, then for each table T checks across cluster and returns first
|
|
||||||
* server (if any) which holds too many regions from T. Returns Optional.absent() if no servers
|
|
||||||
* are found that hold too many regions.
|
|
||||||
*/
|
|
||||||
private Optional<TableAndServer> findSkewedTableServer(Cluster cluster) {
|
|
||||||
Optional<TableAndServer> tableServer = Optional.absent();
|
|
||||||
List<Integer> servers = getShuffledRangeOfInts(0, cluster.numServers);
|
|
||||||
Iterator<Integer> tableIter = getShuffledRangeOfInts(0, cluster.numTables).iterator();
|
|
||||||
while (tableIter.hasNext() && !tableServer.isPresent()) {
|
|
||||||
int table = tableIter.next();
|
|
||||||
int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
|
|
||||||
int numShouldHaveMaxRegions = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
|
|
||||||
int numWithMaxRegions = 0;
|
|
||||||
for (int server : servers) {
|
|
||||||
int numRegions = cluster.numRegionsPerServerPerTable[server][table];
|
|
||||||
// if more than max, server clearly has too many regions
|
|
||||||
if (numRegions > maxRegions) {
|
|
||||||
tableServer = Optional.of(new TableAndServer(table, server));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// if equal to max, check to see if we are within acceptable limit
|
|
||||||
if (numRegions == maxRegions) {
|
|
||||||
numWithMaxRegions++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tablesWithEnoughServersWithMaxRegions[table] = numWithMaxRegions >= numShouldHaveMaxRegions;
|
|
||||||
// If we have found a table with more than max, we are done
|
|
||||||
if (tableServer.isPresent()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, check to see if there are too many servers with maxRegions
|
|
||||||
if (numWithMaxRegions > numShouldHaveMaxRegions) {
|
|
||||||
for (int server : servers) {
|
|
||||||
int numRegions = cluster.numRegionsPerServerPerTable[server][table];
|
|
||||||
if (numRegions == maxRegions) {
|
|
||||||
tableServer = Optional.of(new TableAndServer(table, server));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return tableServer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an list of integers that stores [upper - lower] unique integers in random order
|
|
||||||
* s.t. for each integer i lower <= i < upper
|
|
||||||
*/
|
|
||||||
private List<Integer> getShuffledRangeOfInts(int lower, int upper) {
|
|
||||||
Preconditions.checkArgument(lower < upper);
|
|
||||||
ArrayList<Integer> arr = new ArrayList<Integer>(upper - lower);
|
|
||||||
for (int i = lower; i < upper; i++) {
|
|
||||||
arr.add(i);
|
|
||||||
}
|
|
||||||
Collections.shuffle(arr);
|
|
||||||
return arr;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pick a random region from the specified server and table. Returns -1 if no regions from
|
|
||||||
* the given table lie on the given server
|
|
||||||
*/
|
|
||||||
protected int pickRandomRegionFromTableOnServer(Cluster cluster, int server, int table) {
|
|
||||||
if (server < 0 || table < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
List<Integer> regionsFromTable = new ArrayList<>();
|
|
||||||
for (int region : cluster.regionsPerServer[server]) {
|
|
||||||
if (cluster.regionIndexToTableIndex[region] == table) {
|
|
||||||
regionsFromTable.add(region);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return regionsFromTable.get(RANDOM.nextInt(regionsFromTable.size()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns servers in the cluster that store fewer than k regions for the given table (sorted by
|
|
||||||
* servers with the fewest regions from givenTable first)
|
|
||||||
*/
|
|
||||||
public List<Integer> getServersWithFewerThanKRegionsFromTable(final Cluster cluster, final int givenTable, int k) {
|
|
||||||
List<Integer> serversWithFewerThanK = new ArrayList<>();
|
|
||||||
for (int server = 0; server < cluster.numServers; server++) {
|
|
||||||
if (cluster.numRegionsPerServerPerTable[server][givenTable] < k) {
|
|
||||||
serversWithFewerThanK.add(server);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Collections.sort(serversWithFewerThanK, new Comparator<Integer>() {
|
|
||||||
@Override
|
|
||||||
public int compare(Integer o1, Integer o2) {
|
|
||||||
return cluster.numRegionsPerServerPerTable[o1.intValue()][givenTable] - cluster.numRegionsPerServerPerTable[o2.intValue()][givenTable];
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return serversWithFewerThanK;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given a table T for which server S stores too many regions, attempts to find a
|
|
||||||
* SWAP operation that will better balance the cluster
|
|
||||||
*/
|
|
||||||
public Action findBestActionForTableServer(Cluster cluster, TableAndServer tableServer) {
|
|
||||||
int fromTable = tableServer.getTable();
|
|
||||||
int fromServer = tableServer.getServer();
|
|
||||||
|
|
||||||
int minNumRegions = cluster.minRegionsIfEvenlyDistributed(fromTable);
|
|
||||||
int maxNumRegions = cluster.maxRegionsIfEvenlyDistributed(fromTable);
|
|
||||||
List<Integer> servers;
|
|
||||||
if (tablesWithEnoughServersWithMaxRegions[fromTable]) {
|
|
||||||
servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, minNumRegions);
|
|
||||||
} else {
|
|
||||||
servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, maxNumRegions);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (servers.isEmpty()) {
|
|
||||||
return Cluster.NullAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
Optional<Action> swap = trySwap(cluster, fromServer, fromTable, servers);
|
|
||||||
if (swap.isPresent()) {
|
|
||||||
return swap.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we cannot perform a swap, we should do nothing
|
|
||||||
return Cluster.NullAction;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given server1, table1, we try to find server2 and table2 such that
|
|
||||||
* at least 3 of the following 4 criteria are met
|
|
||||||
*
|
|
||||||
* 1) server1 has too many regions of table1
|
|
||||||
* 2) server1 has too few regions of table2
|
|
||||||
* 3) server2 has too many regions of table2
|
|
||||||
* 4) server2 has too few regions of table1
|
|
||||||
*
|
|
||||||
* We consider N regions from table T
|
|
||||||
* too few if: N < cluster.minRegionsIfEvenlyDistributed(T)
|
|
||||||
* too many if: N > cluster.maxRegionsIfEvenlyDistributed(T)
|
|
||||||
*
|
|
||||||
* Because (1) and (4) are true apriori, we only need to check for (2) and (3).
|
|
||||||
*
|
|
||||||
* If 3 of the 4 criteria are met, we return a swap operation between
|
|
||||||
* randomly selected regions from table1 on server1 and from table2 on server2.
|
|
||||||
*
|
|
||||||
* Optional.absent() is returned if we could not find such a SWAP.
|
|
||||||
*/
|
|
||||||
private Optional<Action> trySwap(Cluster cluster, int server1, int table1, List<Integer> candidateServers) {
|
|
||||||
// Because conditions (1) and (4) are true apriori, we only need to meet one of conditions (2) or (3)
|
|
||||||
List<Integer> tables = getShuffledRangeOfInts(0, cluster.numTables);
|
|
||||||
for (int table2 : tables) {
|
|
||||||
int minRegions = cluster.minRegionsIfEvenlyDistributed(table2);
|
|
||||||
int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table2);
|
|
||||||
for (int server2 : candidateServers) {
|
|
||||||
int numRegions1 = cluster.numRegionsPerServerPerTable[server1][table2];
|
|
||||||
int numRegions2 = cluster.numRegionsPerServerPerTable[server2][table2];
|
|
||||||
if (numRegions2 == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if ((numRegions1 < minRegions || numRegions2 > maxRegions) ||
|
|
||||||
(minRegions != maxRegions && numRegions1 == minRegions && numRegions2 == maxRegions)) {
|
|
||||||
int region1 = pickRandomRegionFromTableOnServer(cluster, server1, table1);
|
|
||||||
int region2 = pickRandomRegionFromTableOnServer(cluster, server2, table2);
|
|
||||||
return Optional.of(getAction(server1, region1, server2, region2));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Optional.absent();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class of StochasticLoadBalancer's Cost Functions.
|
* Base class of StochasticLoadBalancer's Cost Functions.
|
||||||
*/
|
*/
|
||||||
|
@ -1193,7 +966,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
break;
|
break;
|
||||||
case SWAP_REGIONS:
|
case SWAP_REGIONS:
|
||||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||||
regionSwapped(a.fromRegion, a.fromServer, a.toRegion, a.toServer);
|
regionMoved(a.fromRegion, a.fromServer, a.toServer);
|
||||||
|
regionMoved(a.toRegion, a.toServer, a.fromServer);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Uknown action:" + action.type);
|
throw new RuntimeException("Uknown action:" + action.type);
|
||||||
|
@ -1203,11 +977,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void regionSwapped(int region1, int server1, int region2, int server2) {
|
|
||||||
regionMoved(region1, server1, server2);
|
|
||||||
regionMoved(region2, server2, server1);
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract double cost();
|
abstract double cost();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1401,188 +1170,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
"hbase.master.balancer.stochastic.tableSkewCost";
|
"hbase.master.balancer.stochastic.tableSkewCost";
|
||||||
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
||||||
|
|
||||||
/**
|
|
||||||
* Ranges from 0.0 to 1.0 and is the proportion of how much the most skewed table
|
|
||||||
* (as opposed to the average skew across all tables) should affect TableSkew cost
|
|
||||||
*/
|
|
||||||
private static final String MAX_TABLE_SKEW_WEIGHT_KEY =
|
|
||||||
"hbase.master.balancer.stochastic.maxTableSkewWeight";
|
|
||||||
private float DEFAULT_MAX_TABLE_SKEW_WEIGHT = 0.0f;
|
|
||||||
|
|
||||||
private final float maxTableSkewWeight;
|
|
||||||
private final float avgTableSkewWeight;
|
|
||||||
|
|
||||||
// Number of moves for each table required to bring the cluster to a perfectly balanced
|
|
||||||
// state (i.e. as if you had round-robin-ed regions across cluster)
|
|
||||||
private int[] numMovesPerTable;
|
|
||||||
|
|
||||||
TableSkewCostFunction(Configuration conf) {
|
TableSkewCostFunction(Configuration conf) {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
|
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
|
||||||
maxTableSkewWeight = conf.getFloat(MAX_TABLE_SKEW_WEIGHT_KEY, DEFAULT_MAX_TABLE_SKEW_WEIGHT);
|
|
||||||
Preconditions.checkArgument(0.0 <= maxTableSkewWeight && maxTableSkewWeight <= 1.0);
|
|
||||||
avgTableSkewWeight = 1 - maxTableSkewWeight;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Computes cost by:
|
|
||||||
*
|
|
||||||
* 1) Computing a skew score for each table (based on the number of regions
|
|
||||||
* from that table that would have to be moved to reach an evenly balanced state)
|
|
||||||
*
|
|
||||||
* 2) Taking a weighted average of the highest skew score with the average skew score
|
|
||||||
*
|
|
||||||
* 3) Square rooting that value to more evenly distribute the values between 0-1
|
|
||||||
* (since we have observed they are generally very small).
|
|
||||||
*
|
|
||||||
* @return the table skew cost for the cluster
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
double cost() {
|
|
||||||
double[] skewPerTable = computeSkewPerTable();
|
|
||||||
if (skewPerTable.length == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
double maxTableSkew = max(skewPerTable);
|
|
||||||
double avgTableSkew = average(skewPerTable);
|
|
||||||
|
|
||||||
return Math.sqrt(maxTableSkewWeight * maxTableSkew + avgTableSkewWeight * avgTableSkew);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void init(Cluster cluster) {
|
|
||||||
super.init(cluster);
|
|
||||||
numMovesPerTable = computeNumMovesPerTable();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adjusts computed number of moves after two regions have been swapped
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected void regionSwapped(int region1, int server1, int region2, int server2) {
|
|
||||||
// If different tables, simply perform two moves
|
|
||||||
if (cluster.regionIndexToTableIndex[region1] != cluster.regionIndexToTableIndex[region2]) {
|
|
||||||
super.regionSwapped(region1, server1, region2, server2);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// If same table, do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adjusts computed number of moves per table after a region has been moved
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
|
||||||
int table = cluster.regionIndexToTableIndex[region];
|
|
||||||
numMovesPerTable[table] = computeNumMovesForTable(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a mapping of table -> numMoves, where numMoves is the number of regions required to bring
|
|
||||||
* each table to a fully balanced state (i.e. as if its regions had been round-robin-ed across the cluster).
|
|
||||||
*/
|
|
||||||
private int[] computeNumMovesPerTable() {
|
|
||||||
// Determine # region moves required for each table to have regions perfectly distributed across cluster
|
|
||||||
int[] numMovesPerTable = new int[cluster.numTables];
|
|
||||||
for (int table = 0; table < cluster.numTables; table++) {
|
|
||||||
numMovesPerTable[table] = computeNumMovesForTable(table);
|
|
||||||
}
|
|
||||||
return numMovesPerTable;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Computes the number of moves required across all servers to bring the given table to a balanced state
|
|
||||||
* (i.e. as if its regions had been round-robin-ed across the cluster). We only consider moves as # of regions
|
|
||||||
* that need to be sent, not received, so that we do not double count region moves.
|
|
||||||
*/
|
|
||||||
private int computeNumMovesForTable(int table) {
|
|
||||||
int numMinRegions = cluster.minRegionsIfEvenlyDistributed(table);
|
|
||||||
int numMaxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
|
|
||||||
int numMaxServersRemaining = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
|
|
||||||
int numMoves = 0;
|
|
||||||
|
|
||||||
for (int server = 0; server < cluster.numServers; server++) {
|
|
||||||
int numRegions = cluster.numRegionsPerServerPerTable[server][table];
|
|
||||||
if (numRegions >= numMaxRegions && numMaxServersRemaining > 0) {
|
|
||||||
numMoves += numRegions - numMaxRegions;
|
|
||||||
numMaxServersRemaining--;
|
|
||||||
} else if (numRegions > numMinRegions) {
|
|
||||||
numMoves += numRegions - numMinRegions;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return numMoves;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns mapping of tableIndex -> tableSkewScore, where tableSkewScore is a double between 0 to 1 with
|
|
||||||
* 0 indicating no table skew (i.e. perfect distribution of regions among servers), and 1 representing
|
|
||||||
* pathological table skew (i.e. all of a servers regions belonging to one table).
|
|
||||||
*/
|
|
||||||
private double[] computeSkewPerTable() {
|
|
||||||
if (numMovesPerTable == null) {
|
|
||||||
numMovesPerTable = computeNumMovesPerTable();
|
|
||||||
}
|
|
||||||
double[] scaledSkewPerTable = new double[numMovesPerTable.length];
|
|
||||||
for (int table = 0; table < numMovesPerTable.length; table++) {
|
|
||||||
int numTotalRegions = cluster.numRegionsPerTable[table];
|
|
||||||
int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
|
|
||||||
int pathologicalNumMoves = numTotalRegions - maxRegions;
|
|
||||||
scaledSkewPerTable[table] = pathologicalNumMoves == 0 ? 0 : (double) numMovesPerTable[table] / pathologicalNumMoves;
|
|
||||||
}
|
|
||||||
return scaledSkewPerTable;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the max of the values in the passed array
|
|
||||||
*/
|
|
||||||
private double max(double[] arr) {
|
|
||||||
double max = arr[0];
|
|
||||||
for (double d : arr) {
|
|
||||||
if (d > max) {
|
|
||||||
max = d;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return max;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the average of the values in the passed array
|
|
||||||
*/
|
|
||||||
private double average(double[] arr) {
|
|
||||||
double sum = 0;
|
|
||||||
for (double d : arr) {
|
|
||||||
sum += d;
|
|
||||||
}
|
|
||||||
return sum / arr.length;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compute the cost of a potential cluster configuration based upon how evenly
|
|
||||||
* distributed tables are.
|
|
||||||
*
|
|
||||||
* @deprecated replaced by TableSkewCostFunction
|
|
||||||
* This function only considers the maximum # of regions of each table stored
|
|
||||||
* on any one server. This, however, neglects a number of cases. Consider the case
|
|
||||||
* where N servers store 1 more region than as if the regions had been round robin-ed
|
|
||||||
* across the cluster, but then K servers stored 0 regions of the table. The maximum
|
|
||||||
* # regions stored would not properly reflect the table-skew of the cluster.
|
|
||||||
*
|
|
||||||
* Furthermore, this relies upon the cluster.numMaxRegionsPerTable field, which is not
|
|
||||||
* properly updated. The values per table only increase as the cluster shifts (i.e.
|
|
||||||
* as new maxima are found), but they do not go down when the maximum skew decreases
|
|
||||||
* for a particular table.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
static class OldTableSkewCostFunction extends CostFunction {
|
|
||||||
|
|
||||||
private static final String TABLE_SKEW_COST_KEY =
|
|
||||||
"hbase.master.balancer.stochastic.tableSkewCost";
|
|
||||||
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
|
||||||
|
|
||||||
OldTableSkewCostFunction(Configuration conf) {
|
|
||||||
super(conf);
|
|
||||||
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1998,32 +1588,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Data structure that holds table and server indexes
|
|
||||||
*/
|
|
||||||
static class TableAndServer {
|
|
||||||
private final int table;
|
|
||||||
private final int server;
|
|
||||||
|
|
||||||
public TableAndServer(int table, int server) {
|
|
||||||
this.table = table;
|
|
||||||
this.server = server;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getTable() {
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getServer() {
|
|
||||||
return server;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A helper function to compose the attribute name from tablename and costfunction name
|
* A helper function to compose the attribute name from tablename and costfunction name
|
||||||
*/
|
*/
|
||||||
public static String composeAttributeName(String tableName, String costFunctionName) {
|
public static String composeAttributeName(String tableName, String costFunctionName) {
|
||||||
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
|
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RackManager;
|
import org.apache.hadoop.hbase.master.RackManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator;
|
|
||||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.TableSkewCandidateGenerator;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -121,9 +119,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBalanceCluster() throws Exception {
|
public void testBalanceCluster() throws Exception {
|
||||||
float oldMinCostNeedBalance = conf.getFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.05f);
|
|
||||||
conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.02f);
|
|
||||||
loadBalancer.setConf(conf);
|
|
||||||
for (int[] mockCluster : clusterStateMocks) {
|
for (int[] mockCluster : clusterStateMocks) {
|
||||||
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
|
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
|
||||||
List<ServerAndLoad> list = convertToList(servers);
|
List<ServerAndLoad> list = convertToList(servers);
|
||||||
|
@ -139,9 +135,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
returnServer(entry.getKey());
|
returnServer(entry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// reset config
|
|
||||||
conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, oldMinCostNeedBalance);
|
|
||||||
loadBalancer.setConf(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -262,32 +255,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
assertEquals(2.5, result, 0.01);
|
assertEquals(2.5, result, 0.01);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=60000)
|
|
||||||
public void testTableSkewCandidateGeneratorConvergesToZero() {
|
|
||||||
int replication = 1;
|
|
||||||
StochasticLoadBalancer.CostFunction
|
|
||||||
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
|
|
||||||
CandidateGenerator generator = new TableSkewCandidateGenerator();
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
int numNodes = rand.nextInt(500) + 1; // num nodes between 1 - 500
|
|
||||||
int numTables = rand.nextInt(500) + 1; // num tables between 1 and 1000
|
|
||||||
int numRegions = rand.nextInt(numTables * 99) + Math.max(numTables, numNodes); // num regions between max(numTables, numNodes) - numTables*100
|
|
||||||
int numRegionsPerServer = rand.nextInt(numRegions / numNodes) + 1; // num regions per server (except one) between 1 and numRegions / numNodes
|
|
||||||
|
|
||||||
Map<ServerName, List<HRegionInfo>> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
|
||||||
BaseLoadBalancer.Cluster cluster = new Cluster(serverMap, null, null, null);
|
|
||||||
costFunction.init(cluster);
|
|
||||||
double cost = costFunction.cost();
|
|
||||||
while (cost > 0) {
|
|
||||||
Cluster.Action action = generator.generate(cluster);
|
|
||||||
cluster.doAction(action);
|
|
||||||
costFunction.postAction(action);
|
|
||||||
cost = costFunction.cost();
|
|
||||||
}
|
|
||||||
assertEquals(0, cost, .000000000001);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCostFromArray() {
|
public void testCostFromArray() {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
|
@ -35,7 +35,6 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||||
|
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setConf(conf);
|
||||||
|
@ -71,7 +70,6 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
|
||||||
public void testRegionReplicasOnMidClusterHighReplication() {
|
public void testRegionReplicasOnMidClusterHighReplication() {
|
||||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
|
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
|
||||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 4);
|
|
||||||
loadBalancer.setConf(conf);
|
loadBalancer.setConf(conf);
|
||||||
int numNodes = 80;
|
int numNodes = 80;
|
||||||
int numRegions = 6 * numNodes;
|
int numRegions = 6 * numNodes;
|
||||||
|
@ -79,8 +77,6 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
|
||||||
int numRegionsPerServer = 5;
|
int numRegionsPerServer = 5;
|
||||||
int numTables = 10;
|
int numTables = 10;
|
||||||
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
|
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
|
||||||
// reset config
|
|
||||||
conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 35);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 800000)
|
@Test (timeout = 800000)
|
||||||
|
|
Loading…
Reference in New Issue