diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0f1b1a2da93..b0e088c0400 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -140,6 +141,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) int[] regionIndexToTableIndex; //regionIndex -> tableIndex int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions + int[] numRegionsPerTable; // tableIndex -> number of regions that table has int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary boolean hasRegionReplicas = false; //whether there is regions with replicas @@ -330,6 +332,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { numTables = tables.size(); numRegionsPerServerPerTable = new int[numServers][numTables]; + numRegionsPerTable = new int[numTables]; for (int i = 0; i < numServers; i++) { for (int j = 0; j < numTables; j++) { @@ -339,6 +342,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { for (int i=0; i < regionIndexToServerIndex.length; i++) { if (regionIndexToServerIndex[i] >= 0) { + numRegionsPerTable[regionIndexToTableIndex[i]]++; numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; } } @@ -470,6 +474,76 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + /** + * Returns the minimum number of regions of a table T each server would store if T were + * perfectly distributed (i.e. round-robin-ed) across the cluster + */ + public int minRegionsIfEvenlyDistributed(int table) { + return numRegionsPerTable[table] / numServers; + } + + /** + * Returns the maximum number of regions of a table T each server would store if T were + * perfectly distributed (i.e. round-robin-ed) across the cluster + */ + public int maxRegionsIfEvenlyDistributed(int table) { + int min = minRegionsIfEvenlyDistributed(table); + return numRegionsPerTable[table] % numServers == 0 ? min : min + 1; + } + + /** + * Returns the number of servers that should hold maxRegionsIfEvenlyDistributed for a given + * table. A special case here is if maxRegionsIfEvenlyDistributed == minRegionsIfEvenlyDistributed, + * in which case all servers should hold the max + */ + public int numServersWithMaxRegionsIfEvenlyDistributed(int table) { + int numWithMax = numRegionsPerTable[table] % numServers; + if (numWithMax == 0) { + return numServers; + } else { + return numWithMax; + } + } + + /** + * Returns true iff at least one server in the cluster stores either more than the min/max load + * per server when all regions are evenly distributed across the cluster + */ + public boolean hasUnevenRegionDistribution() { + int minLoad = numRegions / numServers; + int maxLoad = numRegions % numServers == 0 ? minLoad : minLoad + 1; + for (int server = 0; server < numServers; server++) { + int numRegions = getNumRegions(server); + if (numRegions > maxLoad || numRegions < minLoad) { + return true; + } + } + return false; + } + + /** + * Returns a pair where the first server is that with the least number of regions across the + * cluster and the second server is that with the most number of regions across the cluster + */ + public Pair findLeastAndMostLoadedServers() { + int minServer = 0; + int maxServer = 0; + int minLoad = getNumRegions(minServer); + int maxLoad = minLoad; + for (int server = 1; server < numServers; server++) { + int numRegions = getNumRegions(server); + if (numRegions < minLoad) { + minServer = server; + minLoad = numRegions; + } + if (numRegions > maxLoad) { + maxServer = server; + maxLoad = numRegions; + } + } + return Pair.newPair(minServer, maxServer); + } + /** An action to move or swap a region */ public static class Action { public static enum Type { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 5c92973c509..8cbdd1e9c67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -30,7 +34,6 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; @@ -49,6 +53,10 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegi import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; /** *

This is a best effort load balancer. Given a Cost function F(C) => x It will @@ -919,6 +927,225 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + /** + * Generates candidate actions to minimize the TableSkew cost function. + * + * For efficiency reasons, the cluster must be passed in when this generator is + * constructed. Every move generated is applied to the cost function + * (i.e. it is assumed that every action we generate is applied to the cluster). + * This means we can adjust our cost incrementally for the cluster, rather than + * recomputing at each iteration. + */ + static class TableSkewCandidateGenerator extends CandidateGenerator { + + // Mapping of table -> true iff too many servers in the cluster store at least + // cluster.maxRegionsIfEvenlydistributed(table) + boolean[] tablesWithEnoughServersWithMaxRegions = null; + + @Override + Action generate(Cluster cluster) { + if (tablesWithEnoughServersWithMaxRegions == null || tablesWithEnoughServersWithMaxRegions.length != cluster.numTables) { + tablesWithEnoughServersWithMaxRegions = new boolean[cluster.numTables]; + } + if (cluster.hasUnevenRegionDistribution()) { + Pair leastAndMostLoadedServers = cluster.findLeastAndMostLoadedServers(); + return moveFromTableWithEnoughRegions(cluster, leastAndMostLoadedServers.getSecond(), leastAndMostLoadedServers.getFirst()); + } else { + Optional tableServer = findSkewedTableServer(cluster); + if (!tableServer.isPresent()) { + return Cluster.NullAction; + } + return findBestActionForTableServer(cluster, tableServer.get()); + } + } + + /** + * Returns a move fromServer -> toServer such that after the move fromServer will still have at least + * the min # regions in terms of table skew calculation + */ + private Action moveFromTableWithEnoughRegions(Cluster cluster, int fromServer, int toServer) { + for (int table : getShuffledRangeOfInts(0, cluster.numTables)) { + int min = cluster.minRegionsIfEvenlyDistributed(table); + if (cluster.numRegionsPerServerPerTable[fromServer][table] > min) { + return getAction(fromServer, pickRandomRegionFromTableOnServer(cluster, fromServer, table), toServer, -1); + } + } + return Cluster.NullAction; + } + + /** + * Picks a random subset of tables, then for each table T checks across cluster and returns first + * server (if any) which holds too many regions from T. Returns Optional.absent() if no servers + * are found that hold too many regions. + */ + private Optional findSkewedTableServer(Cluster cluster) { + Optional tableServer = Optional.absent(); + List servers = getShuffledRangeOfInts(0, cluster.numServers); + Iterator tableIter = getShuffledRangeOfInts(0, cluster.numTables).iterator(); + while (tableIter.hasNext() && !tableServer.isPresent()) { + int table = tableIter.next(); + int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table); + int numShouldHaveMaxRegions = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table); + int numWithMaxRegions = 0; + for (int server : servers) { + int numRegions = cluster.numRegionsPerServerPerTable[server][table]; + // if more than max, server clearly has too many regions + if (numRegions > maxRegions) { + tableServer = Optional.of(new TableAndServer(table, server)); + break; + } + // if equal to max, check to see if we are within acceptable limit + if (numRegions == maxRegions) { + numWithMaxRegions++; + } + } + + tablesWithEnoughServersWithMaxRegions[table] = numWithMaxRegions >= numShouldHaveMaxRegions; + // If we have found a table with more than max, we are done + if (tableServer.isPresent()) { + break; + } + + // Otherwise, check to see if there are too many servers with maxRegions + if (numWithMaxRegions > numShouldHaveMaxRegions) { + for (int server : servers) { + int numRegions = cluster.numRegionsPerServerPerTable[server][table]; + if (numRegions == maxRegions) { + tableServer = Optional.of(new TableAndServer(table, server)); + break; + } + } + } + } + + return tableServer; + } + + /** + * Returns an list of integers that stores [upper - lower] unique integers in random order + * s.t. for each integer i lower <= i < upper + */ + private List getShuffledRangeOfInts(int lower, int upper) { + Preconditions.checkArgument(lower < upper); + ArrayList arr = new ArrayList(upper - lower); + for (int i = lower; i < upper; i++) { + arr.add(i); + } + Collections.shuffle(arr); + return arr; + } + + /** + * Pick a random region from the specified server and table. Returns -1 if no regions from + * the given table lie on the given server + */ + protected int pickRandomRegionFromTableOnServer(Cluster cluster, int server, int table) { + if (server < 0 || table < 0) { + return -1; + } + List regionsFromTable = new ArrayList<>(); + for (int region : cluster.regionsPerServer[server]) { + if (cluster.regionIndexToTableIndex[region] == table) { + regionsFromTable.add(region); + } + } + return regionsFromTable.get(RANDOM.nextInt(regionsFromTable.size())); + } + + /** + * Returns servers in the cluster that store fewer than k regions for the given table (sorted by + * servers with the fewest regions from givenTable first) + */ + public List getServersWithFewerThanKRegionsFromTable(final Cluster cluster, final int givenTable, int k) { + List serversWithFewerThanK = new ArrayList<>(); + for (int server = 0; server < cluster.numServers; server++) { + if (cluster.numRegionsPerServerPerTable[server][givenTable] < k) { + serversWithFewerThanK.add(server); + } + } + Collections.sort(serversWithFewerThanK, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return cluster.numRegionsPerServerPerTable[o1.intValue()][givenTable] - cluster.numRegionsPerServerPerTable[o2.intValue()][givenTable]; + } + }); + return serversWithFewerThanK; + } + + /** + * Given a table T for which server S stores too many regions, attempts to find a + * SWAP operation that will better balance the cluster + */ + public Action findBestActionForTableServer(Cluster cluster, TableAndServer tableServer) { + int fromTable = tableServer.getTable(); + int fromServer = tableServer.getServer(); + + int minNumRegions = cluster.minRegionsIfEvenlyDistributed(fromTable); + int maxNumRegions = cluster.maxRegionsIfEvenlyDistributed(fromTable); + List servers; + if (tablesWithEnoughServersWithMaxRegions[fromTable]) { + servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, minNumRegions); + } else { + servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, maxNumRegions); + } + + if (servers.isEmpty()) { + return Cluster.NullAction; + } + + Optional swap = trySwap(cluster, fromServer, fromTable, servers); + if (swap.isPresent()) { + return swap.get(); + } + + // If we cannot perform a swap, we should do nothing + return Cluster.NullAction; + } + + /** + * Given server1, table1, we try to find server2 and table2 such that + * at least 3 of the following 4 criteria are met + * + * 1) server1 has too many regions of table1 + * 2) server1 has too few regions of table2 + * 3) server2 has too many regions of table2 + * 4) server2 has too few regions of table1 + * + * We consider N regions from table T + * too few if: N < cluster.minRegionsIfEvenlyDistributed(T) + * too many if: N > cluster.maxRegionsIfEvenlyDistributed(T) + * + * Because (1) and (4) are true apriori, we only need to check for (2) and (3). + * + * If 3 of the 4 criteria are met, we return a swap operation between + * randomly selected regions from table1 on server1 and from table2 on server2. + * + * Optional.absent() is returned if we could not find such a SWAP. + */ + private Optional trySwap(Cluster cluster, int server1, int table1, List candidateServers) { + // Because conditions (1) and (4) are true apriori, we only need to meet one of conditions (2) or (3) + List tables = getShuffledRangeOfInts(0, cluster.numTables); + for (int table2 : tables) { + int minRegions = cluster.minRegionsIfEvenlyDistributed(table2); + int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table2); + for (int server2 : candidateServers) { + int numRegions1 = cluster.numRegionsPerServerPerTable[server1][table2]; + int numRegions2 = cluster.numRegionsPerServerPerTable[server2][table2]; + if (numRegions2 == 0) { + continue; + } + if ((numRegions1 < minRegions || numRegions2 > maxRegions) || + (minRegions != maxRegions && numRegions1 == minRegions && numRegions2 == maxRegions)) { + int region1 = pickRandomRegionFromTableOnServer(cluster, server1, table1); + int region2 = pickRandomRegionFromTableOnServer(cluster, server2, table2); + return Optional.of(getAction(server1, region1, server2, region2)); + } + } + } + return Optional.absent(); + } + } + /** * Base class of StochasticLoadBalancer's Cost Functions. */ @@ -966,8 +1193,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { break; case SWAP_REGIONS: SwapRegionsAction a = (SwapRegionsAction) action; - regionMoved(a.fromRegion, a.fromServer, a.toServer); - regionMoved(a.toRegion, a.toServer, a.fromServer); + regionSwapped(a.fromRegion, a.fromServer, a.toRegion, a.toServer); break; default: throw new RuntimeException("Uknown action:" + action.type); @@ -977,6 +1203,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { protected void regionMoved(int region, int oldServer, int newServer) { } + protected void regionSwapped(int region1, int server1, int region2, int server2) { + regionMoved(region1, server1, server2); + regionMoved(region2, server2, server1); + } + abstract double cost(); /** @@ -1170,9 +1401,188 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "hbase.master.balancer.stochastic.tableSkewCost"; private static final float DEFAULT_TABLE_SKEW_COST = 35; + /** + * Ranges from 0.0 to 1.0 and is the proportion of how much the most skewed table + * (as opposed to the average skew across all tables) should affect TableSkew cost + */ + private static final String MAX_TABLE_SKEW_WEIGHT_KEY = + "hbase.master.balancer.stochastic.maxTableSkewWeight"; + private float DEFAULT_MAX_TABLE_SKEW_WEIGHT = 0.0f; + + private final float maxTableSkewWeight; + private final float avgTableSkewWeight; + + // Number of moves for each table required to bring the cluster to a perfectly balanced + // state (i.e. as if you had round-robin-ed regions across cluster) + private int[] numMovesPerTable; + TableSkewCostFunction(Configuration conf) { super(conf); this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); + maxTableSkewWeight = conf.getFloat(MAX_TABLE_SKEW_WEIGHT_KEY, DEFAULT_MAX_TABLE_SKEW_WEIGHT); + Preconditions.checkArgument(0.0 <= maxTableSkewWeight && maxTableSkewWeight <= 1.0); + avgTableSkewWeight = 1 - maxTableSkewWeight; + } + + /** + * Computes cost by: + * + * 1) Computing a skew score for each table (based on the number of regions + * from that table that would have to be moved to reach an evenly balanced state) + * + * 2) Taking a weighted average of the highest skew score with the average skew score + * + * 3) Square rooting that value to more evenly distribute the values between 0-1 + * (since we have observed they are generally very small). + * + * @return the table skew cost for the cluster + */ + @Override + double cost() { + double[] skewPerTable = computeSkewPerTable(); + if (skewPerTable.length == 0) { + return 0; + } + double maxTableSkew = max(skewPerTable); + double avgTableSkew = average(skewPerTable); + + return Math.sqrt(maxTableSkewWeight * maxTableSkew + avgTableSkewWeight * avgTableSkew); + } + + @Override + void init(Cluster cluster) { + super.init(cluster); + numMovesPerTable = computeNumMovesPerTable(); + } + + /** + * Adjusts computed number of moves after two regions have been swapped + */ + @Override + protected void regionSwapped(int region1, int server1, int region2, int server2) { + // If different tables, simply perform two moves + if (cluster.regionIndexToTableIndex[region1] != cluster.regionIndexToTableIndex[region2]) { + super.regionSwapped(region1, server1, region2, server2); + return; + } + // If same table, do nothing + } + + /** + * Adjusts computed number of moves per table after a region has been moved + */ + @Override + protected void regionMoved(int region, int oldServer, int newServer) { + int table = cluster.regionIndexToTableIndex[region]; + numMovesPerTable[table] = computeNumMovesForTable(table); + } + + /** + * Returns a mapping of table -> numMoves, where numMoves is the number of regions required to bring + * each table to a fully balanced state (i.e. as if its regions had been round-robin-ed across the cluster). + */ + private int[] computeNumMovesPerTable() { + // Determine # region moves required for each table to have regions perfectly distributed across cluster + int[] numMovesPerTable = new int[cluster.numTables]; + for (int table = 0; table < cluster.numTables; table++) { + numMovesPerTable[table] = computeNumMovesForTable(table); + } + return numMovesPerTable; + } + + /** + * Computes the number of moves required across all servers to bring the given table to a balanced state + * (i.e. as if its regions had been round-robin-ed across the cluster). We only consider moves as # of regions + * that need to be sent, not received, so that we do not double count region moves. + */ + private int computeNumMovesForTable(int table) { + int numMinRegions = cluster.minRegionsIfEvenlyDistributed(table); + int numMaxRegions = cluster.maxRegionsIfEvenlyDistributed(table); + int numMaxServersRemaining = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table); + int numMoves = 0; + + for (int server = 0; server < cluster.numServers; server++) { + int numRegions = cluster.numRegionsPerServerPerTable[server][table]; + if (numRegions >= numMaxRegions && numMaxServersRemaining > 0) { + numMoves += numRegions - numMaxRegions; + numMaxServersRemaining--; + } else if (numRegions > numMinRegions) { + numMoves += numRegions - numMinRegions; + } + } + return numMoves; + } + + /** + * Returns mapping of tableIndex -> tableSkewScore, where tableSkewScore is a double between 0 to 1 with + * 0 indicating no table skew (i.e. perfect distribution of regions among servers), and 1 representing + * pathological table skew (i.e. all of a servers regions belonging to one table). + */ + private double[] computeSkewPerTable() { + if (numMovesPerTable == null) { + numMovesPerTable = computeNumMovesPerTable(); + } + double[] scaledSkewPerTable = new double[numMovesPerTable.length]; + for (int table = 0; table < numMovesPerTable.length; table++) { + int numTotalRegions = cluster.numRegionsPerTable[table]; + int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table); + int pathologicalNumMoves = numTotalRegions - maxRegions; + scaledSkewPerTable[table] = pathologicalNumMoves == 0 ? 0 : (double) numMovesPerTable[table] / pathologicalNumMoves; + } + return scaledSkewPerTable; + } + + /** + * Returns the max of the values in the passed array + */ + private double max(double[] arr) { + double max = arr[0]; + for (double d : arr) { + if (d > max) { + max = d; + } + } + return max; + } + + /** + * Returns the average of the values in the passed array + */ + private double average(double[] arr) { + double sum = 0; + for (double d : arr) { + sum += d; + } + return sum / arr.length; + } + } + + /** + * Compute the cost of a potential cluster configuration based upon how evenly + * distributed tables are. + * + * @deprecated replaced by TableSkewCostFunction + * This function only considers the maximum # of regions of each table stored + * on any one server. This, however, neglects a number of cases. Consider the case + * where N servers store 1 more region than as if the regions had been round robin-ed + * across the cluster, but then K servers stored 0 regions of the table. The maximum + * # regions stored would not properly reflect the table-skew of the cluster. + * + * Furthermore, this relies upon the cluster.numMaxRegionsPerTable field, which is not + * properly updated. The values per table only increase as the cluster shifts (i.e. + * as new maxima are found), but they do not go down when the maximum skew decreases + * for a particular table. + */ + @Deprecated + static class OldTableSkewCostFunction extends CostFunction { + + private static final String TABLE_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.tableSkewCost"; + private static final float DEFAULT_TABLE_SKEW_COST = 35; + + OldTableSkewCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); } @Override @@ -1450,7 +1860,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { for (int i = 0 ; i < costsPerGroup.length; i++) { totalCost += costsPerGroup[i]; } - return scale(0, maxCost, totalCost); + // Still return high cost for single region replicas being cohosted even as cluster scales + return Math.sqrt(scale(0, maxCost, totalCost)); } /** @@ -1588,10 +1999,32 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } + /** + * Data structure that holds table and server indexes + */ + static class TableAndServer { + private final int table; + private final int server; + + public TableAndServer(int table, int server) { + this.table = table; + this.server = server; + } + + public int getTable() { + return table; + } + + public int getServer() { + return server; + } + } + /** * A helper function to compose the attribute name from tablename and costfunction name */ public static String composeAttributeName(String tableName, String costFunctionName) { return tableName + TABLE_FUNCTION_SEP + costFunctionName; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 9d193d2f3ec..37ff35fdbeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; +import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator; +import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.TableSkewCandidateGenerator; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -118,7 +120,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { */ @Test public void testBalanceCluster() throws Exception { - + float oldMinCostNeedBalance = conf.getFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.05f); + conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.02f); + loadBalancer.setConf(conf); for (int[] mockCluster : clusterStateMocks) { Map> servers = mockClusterServers(mockCluster); List list = convertToList(servers); @@ -134,6 +138,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { returnServer(entry.getKey()); } } + // reset config + conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, oldMinCostNeedBalance); + loadBalancer.setConf(conf); } @Test @@ -252,6 +259,32 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { double result = storeFileCostFunction.getRegionLoadCost(regionLoads); // storefile size cost is simply an average of it's value over time assertEquals(2.5, result, 0.01); + } + + @Test (timeout=60000) + public void testTableSkewCandidateGeneratorConvergesToZero() { + int replication = 1; + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); + CandidateGenerator generator = new TableSkewCandidateGenerator(); + for (int i = 0; i < 100; i++) { + int numNodes = 1 + rand.nextInt(5 * i + 1); + int numTables = 1 + rand.nextInt(5 * i + 1); + int numRegions = rand.nextInt(numTables * 99) + Math.max(numTables, numNodes); // num regions between max(numTables, numNodes) - numTables*100 + int numRegionsPerServer = rand.nextInt(numRegions / numNodes) + 1; // num regions per server (except one) between 1 and numRegions / numNodes + + Map> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + BaseLoadBalancer.Cluster cluster = new Cluster(serverMap, null, null, null); + costFunction.init(cluster); + double cost = costFunction.cost(); + while (cost > 0) { + Cluster.Action action = generator.generate(cluster); + cluster.doAction(action); + costFunction.postAction(action); + cost = costFunction.cost(); + } + assertEquals(0, cost, .000000000001); + } } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java index 2f315de2c0d..03d2ef2d345 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java @@ -35,6 +35,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase { conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f); loadBalancer.setConf(conf); @@ -70,6 +71,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase { public void testRegionReplicasOnMidClusterHighReplication() { conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec + conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 4); loadBalancer.setConf(conf); int numNodes = 80; int numRegions = 6 * numNodes; @@ -77,6 +79,8 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase { int numRegionsPerServer = 5; int numTables = 10; testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true); + // reset config + conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 35); } @Test (timeout = 800000)