diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 6d8bd4884f6..0e4ae1d1e53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,7 +60,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { */ protected static class Cluster { ServerName[] servers; - ArrayList tables; + ArrayList tables; HRegionInfo[] regions; List[] regionLoads; int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality @@ -70,8 +72,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS - Map serversToIndex; - Map tablesToIndex; + Integer[] serverIndicesSortedByRegionCount; + + Map serversToIndex; + Map tablesToIndex; int numRegions; int numServers; @@ -82,21 +86,35 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected Cluster(Map> clusterState, Map> loads, RegionLocationFinder regionFinder) { - serversToIndex = new HashMap(clusterState.size()); - tablesToIndex = new HashMap(); + + serversToIndex = new HashMap(); + tablesToIndex = new HashMap(); //regionsToIndex = new HashMap(); //TODO: We should get the list of tables from master - tables = new ArrayList(); + tables = new ArrayList(); + - numServers = clusterState.size(); numRegions = 0; + int serverIndex = 0; + + // Use servername and port as there can be dead servers in this list. We want everything with + // a matching hostname and port to have the same index. + for (ServerName sn:clusterState.keySet()) { + if (serversToIndex.get(sn.getHostAndPort()) == null) { + serversToIndex.put(sn.getHostAndPort(), serverIndex++); + } + } + + // Count how many regions there are. for (Entry> entry : clusterState.entrySet()) { numRegions += entry.getValue().size(); } - regionsPerServer = new int[clusterState.size()][]; + numServers = serversToIndex.size(); + regionsPerServer = new int[serversToIndex.size()][]; + servers = new ServerName[numServers]; regions = new HRegionInfo[numRegions]; regionIndexToServerIndex = new int[numRegions]; @@ -104,26 +122,35 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex = new int[numRegions]; regionLoads = new List[numRegions]; regionLocations = new int[numRegions][]; + serverIndicesSortedByRegionCount = new Integer[numServers]; + + int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; - int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0; - // populate serversToIndex first for (Entry> entry : clusterState.entrySet()) { - servers[serverIndex] = entry.getKey(); + serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); + + // keep the servername if this is the first server name for this hostname + // or this servername has the newest startcode. + if (servers[serverIndex] == null || + servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { + servers[serverIndex] = entry.getKey(); + } + regionsPerServer[serverIndex] = new int[entry.getValue().size()]; - serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex)); - serverIndex++; + serverIndicesSortedByRegionCount[serverIndex] = serverIndex; } - serverIndex = 0; + for (Entry> entry : clusterState.entrySet()) { + serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); regionPerServerIndex = 0; + for (HRegionInfo region : entry.getValue()) { - byte[] tableName = region.getTableName(); - int tableHash = Bytes.mapKey(tableName); - Integer idx = tablesToIndex.get(tableHash); + String tableName = region.getTableNameAsString(); + Integer idx = tablesToIndex.get(tableName); if (idx == null) { tables.add(tableName); idx = tableIndex; - tablesToIndex.put(tableHash, tableIndex++); + tablesToIndex.put(tableName, tableIndex++); } regions[regionIndex] = region; @@ -132,7 +159,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndexToTableIndex[regionIndex] = idx; regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; - //region load + // region load if (loads != null) { List rl = loads.get(region.getRegionNameAsString()); // That could have failed if the RegionLoad is using the other regionName @@ -156,7 +183,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionIndex++; } - serverIndex++; } numTables = tables.size(); @@ -263,6 +289,53 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } return regions; } + + void sortServersByRegionCount() { + Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); + } + + int getNumRegions(int server) { + return regionsPerServer[server].length; + } + + private Comparator numRegionsComparator = new Comparator() { + @Override + public int compare(Integer integer, Integer integer2) { + return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2)); + } + }; + + @Override + public String toString() { + String desc = "Cluster{" + + "servers=["; + for(ServerName sn:servers) { + desc += sn.getHostAndPort() + ", "; + } + desc += + ", serverIndicesSortedByRegionCount="+ + Arrays.toString(serverIndicesSortedByRegionCount) + + ", regionsPerServer=["; + + for (int[]r:regionsPerServer) { + desc += Arrays.toString(r); + } + desc += "]" + + ", numMaxRegionsPerTable=" + + Arrays.toString(numMaxRegionsPerTable) + + ", numRegions=" + + numRegions + + ", numServers=" + + numServers + + ", numTables=" + + numTables + + ", numMovedRegions=" + + numMovedRegions + + ", numMovedMetaRegions=" + + numMovedMetaRegions + + '}'; + return desc; + } } // slop for regions @@ -270,7 +343,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { private Configuration config; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class); - protected MasterServices services; @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 30f36ca2a34..7609e5a6034 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** *

This is a best effort load balancer. Given a Cost function F(C) => x It will @@ -86,48 +87,35 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private public class StochasticLoadBalancer extends BaseLoadBalancer { - private static final String STOREFILE_SIZE_COST_KEY = - "hbase.master.balancer.stochastic.storefileSizeCost"; - private static final String MEMSTORE_SIZE_COST_KEY = - "hbase.master.balancer.stochastic.memstoreSizeCost"; - private static final String WRITE_REQUEST_COST_KEY = - "hbase.master.balancer.stochastic.writeRequestCost"; - private static final String READ_REQUEST_COST_KEY = - "hbase.master.balancer.stochastic.readRequestCost"; - private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; - private static final String TABLE_LOAD_COST_KEY = - "hbase.master.balancer.stochastic.tableLoadCost"; - private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; - private static final String REGION_LOAD_COST_KEY = - "hbase.master.balancer.stochastic.regionLoadCost"; private static final String STEPS_PER_REGION_KEY = "hbase.master.balancer.stochastic.stepsPerRegion"; - private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; - private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions"; - private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime"; - private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; + private static final String MAX_STEPS_KEY = + "hbase.master.balancer.stochastic.maxSteps"; + private static final String MAX_RUNNING_TIME_KEY = + "hbase.master.balancer.stochastic.maxRunningTime"; + private static final String KEEP_REGION_LOADS = + "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); + private final RegionLocationFinder regionFinder = new RegionLocationFinder(); private ClusterStatus clusterStatus = null; private Map> loads = new HashMap>(); // values are defaults - private int maxSteps = 15000; - private int stepsPerRegion = 110; - private long maxRunningTime = 60 * 1000; //1 min - private int maxMoves = 600; + private int maxSteps = 1000000; + private int stepsPerRegion = 800; + private long maxRunningTime = 60 * 1000 * 1; // 1 min private int numRegionLoadsToRemember = 15; - private float loadMultiplier = 100; - private float moveCostMultiplier = 1; - private float tableMultiplier = 5; - private float localityMultiplier = 5; - private float readRequestMultiplier = 0; - private float writeRequestMultiplier = 0; - private float memStoreSizeMultiplier = 5; - private float storeFileSizeMultiplier = 5; + private RegionPicker[] pickers; + private CostFromRegionLoadFunction[] regionLoadFunctions; + private CostFunction[] costFunctions; + // Keep locality based picker and cost function to alert them + // when new services are offered + private LocalityBasedPicker localityPicker; + private LocalityCostFunction localityCost; @Override public void setConf(Configuration conf) { @@ -135,27 +123,38 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { regionFinder.setConf(conf); maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); - maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves); + stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); - // Load multiplier should be the greatest as it is the most general way to balance data. - loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier); + localityPicker = new LocalityBasedPicker(services); + localityCost = new LocalityCostFunction(conf, services); - // Move cost multiplier should be the same cost or higer than the rest of the costs to ensure - // that two costs must get better to justify a move cost. - moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier); + pickers = new RegionPicker[] { + new RandomRegionPicker(), + new LoadPicker(), + //localityPicker + }; - // These are the added costs so that the stochastic load balancer can get a little bit smarter - // about where to move regions. - tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier); - localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier); - memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier); - storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier); - readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier); - writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier); + regionLoadFunctions = new CostFromRegionLoadFunction[] { + new ReadRequestCostFunction(conf), + new WriteRequestCostFunction(conf), + new MemstoreSizeCostFunction(conf), + new StoreFileCostFunction(conf) + }; + + costFunctions = new CostFunction[]{ + new RegionCountSkewCostFunction(conf), + new MoveCostFunction(conf), + localityCost, + new TableSkewCostFunction(conf), + regionLoadFunctions[0], + regionLoadFunctions[1], + regionLoadFunctions[2], + regionLoadFunctions[3], + }; } @Override @@ -164,13 +163,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { regionFinder.setClusterStatus(st); this.clusterStatus = st; updateRegionLoad(); + for(CostFromRegionLoadFunction cost : regionLoadFunctions) { + cost.setClusterStatus(st); + } } @Override public void setMasterServices(MasterServices masterServices) { super.setMasterServices(masterServices); - this.services = masterServices; this.regionFinder.setServices(masterServices); + this.localityCost.setServices(masterServices); + this.localityPicker.setServices(masterServices); + } /** @@ -179,78 +183,84 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ @Override public List balanceCluster(Map> clusterState) { - - if (!needsBalance(new ClusterLoadState(clusterState))) { - return null; - } + //if (!needsBalance(new ClusterLoadState(clusterState))) { + // return null; + //} long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Keep track of servers to iterate through them. - double currentCost, newCost, initCost; - Cluster cluster = new Cluster(clusterState, loads, regionFinder); - currentCost = newCost = initCost = computeCost(cluster); + double currentCost = computeCost(cluster, Double.MAX_VALUE); - int computedMaxSteps = - Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion)); + double initCost = currentCost; + double newCost = currentCost; + + long computedMaxSteps = Math.min(this.maxSteps, + ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); // Perform a stochastic walk to see if we can get a good fit. - int step; + long step; for (step = 0; step < computedMaxSteps; step++) { + int pickerIdx = RANDOM.nextInt(pickers.length); + RegionPicker p = pickers[pickerIdx]; + Pair, Pair> picks = p.pick(cluster); - // try and perform a mutation - for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) { + int leftServer = picks.getFirst().getFirst(); + int leftRegion = picks.getFirst().getSecond(); + int rightServer = picks.getSecond().getFirst(); + int rightRegion = picks.getSecond().getSecond(); - // What server are we going to be swapping regions with ? - int rightServer = pickOtherServer(leftServer, cluster); - if (rightServer < 0) { - continue; - } - - // Pick what regions to swap around. - // If we get a null for one then this isn't a swap just a move - int lRegion = pickRandomRegion(cluster, leftServer, 0); - int rRegion = pickRandomRegion(cluster, rightServer, 0.5); - - // We randomly picked to do nothing. - if (lRegion < 0 && rRegion < 0) { - continue; - } - - cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion); - - newCost = computeCost(cluster); - // Should this be kept? - if (newCost < currentCost) { - currentCost = newCost; - } else { - // Put things back the way they were before. - //TODO: undo by remembering old values, using an UndoAction class - cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion); - } + // We couldn't find a server + if (rightServer < 0 || leftServer < 0) { + continue; } - if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) { + + // We randomly picked to do nothing. + if (leftRegion < 0 && rightRegion < 0) { + continue; + } + + cluster.moveOrSwapRegion(leftServer, + rightServer, + leftRegion, + rightRegion); + + newCost = computeCost(cluster, currentCost); + // Should this be kept? + if (newCost < currentCost) { + currentCost = newCost; + } else { + // Put things back the way they were before. + // TODO: undo by remembering old values, using an UndoAction class + cluster.moveOrSwapRegion(leftServer, + rightServer, + rightRegion, + leftRegion); + } + + if (EnvironmentEdgeManager.currentTimeMillis() - startTime > + maxRunningTime) { break; } } long endTime = EnvironmentEdgeManager.currentTimeMillis(); + if (initCost > currentCost) { List plans = createRegionPlans(cluster); - if (LOG.isDebugEnabled()) { LOG.debug("Finished computing new load balance plan. Computation took " + (endTime - startTime) + "ms to try " + step - + " different iterations. Found a solution that moves " + plans.size() - + " regions; Going from a computed cost of " + initCost + " to a new cost of " - + currentCost); + + " different iterations. Found a solution that moves " + + plans.size() + " regions; Going from a computed cost of " + + initCost + " to a new cost of " + currentCost); } return plans; } if (LOG.isDebugEnabled()) { - LOG.debug("Could not find a better load balance plan. Tried " + step - + " different configurations in " + (endTime - startTime) + LOG.debug("Could not find a better load balance plan. Tried " + + step + " different configurations in " + (endTime - startTime) + "ms, and did not find anything with a computed cost less than " + initCost); } return null; @@ -265,14 +275,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { */ private List createRegionPlans(Cluster cluster) { List plans = new LinkedList(); - - for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { + for (int regionIndex = 0; + regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; + if (initialServerIndex != newServerIndex) { HRegionInfo region = cluster.regions[regionIndex]; ServerName initialServer = cluster.servers[initialServerIndex]; ServerName newServer = cluster.servers[newServerIndex]; + if (LOG.isTraceEnabled()) { LOG.trace("Moving Region " + region.getEncodedName() + " from server " + initialServer.getHostname() + " to " + newServer.getHostname()); @@ -284,30 +296,31 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return plans; } - /** Store the current region loads. */ + /** + * Store the current region loads. + */ private synchronized void updateRegionLoad() { - - //We create a new hashmap so that regions that are no longer there are removed. - //However we temporarily need the old loads so we can use them to keep the rolling average. + // We create a new hashmap so that regions that are no longer there are removed. + // However we temporarily need the old loads so we can use them to keep the rolling average. Map> oldLoads = loads; loads = new HashMap>(); for (ServerName sn : clusterStatus.getServers()) { ServerLoad sl = clusterStatus.getLoad(sn); - if (sl == null) continue; + if (sl == null) { + continue; + } for (Entry entry : sl.getRegionsLoad().entrySet()) { List rLoads = oldLoads.get(Bytes.toString(entry.getKey())); if (rLoads != null) { - - //We're only going to keep 15. So if there are that many already take the last 14 + // We're only going to keep 15. So if there are that many already take the last 14 if (rLoads.size() >= numRegionLoadsToRemember) { - int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember); - + int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember); rLoads = rLoads.subList(numToRemove, rLoads.size()); } } else { - //There was nothing there + // There was nothing there rLoads = new ArrayList(); } rLoads.add(entry.getValue()); @@ -315,322 +328,628 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } } - } - /** - * From a list of regions pick a random one. Null can be returned which - * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move - * rather than swap. - * - * @param cluster The state of the cluster - * @param server index of the server - * @param chanceOfNoSwap Chance that this will decide to try a move rather - * than a swap. - * @return a random {@link HRegionInfo} or null if an asymmetrical move is - * suggested. - */ - private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { - //Check to see if this is just a move. - if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { - //signal a move only. - return -1; - } - int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); - return cluster.regionsPerServer[server][rand]; - - } - - /** - * Given a server we will want to switch regions with another server. This - * function picks a random server from the list. - * - * @param serverIndex Current Server. This server will never be the return value. - * @param cluster The state of the cluster - * @return random server. Null if no other servers were found. - */ - private int pickOtherServer(int serverIndex, Cluster cluster) { - if (cluster.numServers < 2) { - return -1; - } - while (true) { - int otherServerIndex = RANDOM.nextInt(cluster.numServers); - if (otherServerIndex != serverIndex) { - return otherServerIndex; - } + for(CostFromRegionLoadFunction cost : regionLoadFunctions) { + cost.setLoads(loads); } } + /** * This is the main cost function. It will compute a cost associated with a proposed cluster * state. All different costs will be combined with their multipliers to produce a double cost. * * @param cluster The state of the cluster - * @return a double of a cost associated with the proposed + * @param previousCost the previous cost. This is used as an early out. + * @return a double of a cost associated with the proposed cluster state. This cost is an + * aggregate of all individual cost functions. */ - protected double computeCost(Cluster cluster) { - double moveCost = (moveCostMultiplier > 0) ? - (moveCostMultiplier * computeMoveCost(cluster)) : - 0; + protected double computeCost(Cluster cluster, double previousCost) { + double total = 0; - double regionCountSkewCost = (loadMultiplier > 0) ? - (loadMultiplier * computeSkewLoadCost(cluster)) : - 0; + for (CostFunction c:costFunctions) { + if (c.getMultiplier() <= 0) { + continue; + } - double tableSkewCost = (tableMultiplier > 0) ? - (tableMultiplier * computeTableSkewLoadCost(cluster)) : - 0; + total += c.getMultiplier() * c.cost(cluster); - double localityCost = (localityMultiplier > 0) ? - (localityMultiplier * computeDataLocalityCost(cluster)) : - 0; - - double memstoreSizeCost = - (memStoreSizeMultiplier > 0) ? - (memStoreSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE)) : - 0; - - double storefileSizeCost = - (storeFileSizeMultiplier > 0) ? - (storeFileSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE)): - 0; - - double readRequestCost = - (readRequestMultiplier > 0) ? - (readRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST)) : - 0; - - double writeRequestCost = - (writeRequestMultiplier > 0) ? - (writeRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST)) : - 0; - - double total = - moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost - + storefileSizeCost + readRequestCost + writeRequestCost; - if (LOG.isTraceEnabled()) { - LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = " - + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = " - + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = " - + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost); + if (total > previousCost) { + return total; + } } return total; } + abstract static class RegionPicker { + abstract Pair, Pair> pick(Cluster cluster); + + /** + * From a list of regions pick a random one. Null can be returned which + * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move + * rather than swap. + * + * @param cluster The state of the cluster + * @param server index of the server + * @param chanceOfNoSwap Chance that this will decide to try a move rather + * than a swap. + * @return a random {@link HRegionInfo} or null if an asymmetrical move is + * suggested. + */ + protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { + // Check to see if this is just a move. + if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { + // signal a move only. + return -1; + } + int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); + return cluster.regionsPerServer[server][rand]; + + } + protected int pickRandomServer(Cluster cluster) { + if (cluster.numServers < 1) { + return -1; + } + + return RANDOM.nextInt(cluster.numServers); + } + protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { + if (cluster.numServers < 2) { + return -1; + } + while (true) { + int otherServerIndex = pickRandomServer(cluster); + if (otherServerIndex != serverIndex) { + return otherServerIndex; + } + } + } + + protected Pair pickRandomRegions(Cluster cluster, + int thisServer, + int otherServer) { + if (thisServer < 0 || otherServer < 0) { + return new Pair(-1, -1); + } + + // Decide who is most likely to need another region + int thisRegionCount = cluster.getNumRegions(thisServer); + int otherRegionCount = cluster.getNumRegions(otherServer); + + // Assign the chance based upon the above + double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5; + double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5; + + int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); + int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); + + return new Pair(thisRegion, otherRegion); + } + } + + static class RandomRegionPicker extends RegionPicker { + + @Override + Pair, Pair> pick(Cluster cluster) { + + int thisServer = pickRandomServer(cluster); + + // Pick the other server + int otherServer = pickOtherRandomServer(cluster, thisServer); + + Pair regions = pickRandomRegions(cluster, thisServer, otherServer); + + return new Pair, Pair>( + new Pair(thisServer, regions.getFirst()), + new Pair(otherServer, regions.getSecond()) + + ); + } + + } + + public static class LoadPicker extends RegionPicker { + + @Override + Pair, Pair> pick(Cluster cluster) { + cluster.sortServersByRegionCount(); + int thisServer = pickMostLoadedServer(cluster, -1); + int otherServer = pickLeastLoadedServer(cluster, thisServer); + + Pair regions = pickRandomRegions(cluster, thisServer, otherServer); + return new Pair, Pair>( + new Pair(thisServer, regions.getFirst()), + new Pair(otherServer, regions.getSecond()) + + ); + } + + private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + + int index = 0; + while (servers[index] == null || servers[index] == thisServer) { + index++; + if (index == servers.length) { + return -1; + } + } + return servers[index]; + } + + private int pickMostLoadedServer(final Cluster cluster, int thisServer) { + Integer[] servers = cluster.serverIndicesSortedByRegionCount; + + int index = servers.length - 1; + while (servers[index] == null || servers[index] == thisServer) { + index--; + if (index < 0) { + return -1; + } + } + return servers[index]; + } + } + + static class LocalityBasedPicker extends RegionPicker { + + private MasterServices masterServices; + + LocalityBasedPicker(MasterServices masterServices) { + this.masterServices = masterServices; + } + + @Override + Pair, Pair> pick(Cluster cluster) { + if (this.masterServices == null) { + return new Pair, Pair>( + new Pair(-1,-1), + new Pair(-1,-1) + ); + } + // Pick a random region server + int thisServer = pickRandomServer(cluster); + + // Pick a random region on this server + int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); + + if (thisRegion == -1) { + return new Pair, Pair>( + new Pair(-1,-1), + new Pair(-1,-1) + ); + } + + // Pick the server with the highest locality + int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion); + + // pick an region on the other server to potentially swap + int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f); + + return new Pair, Pair>( + new Pair(thisServer,thisRegion), + new Pair(otherServer,otherRegion) + ); + } + + private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { + int[] regionLocations = cluster.regionLocations[thisRegion]; + + if (regionLocations == null || regionLocations.length <= 1) { + return pickOtherRandomServer(cluster, thisServer); + } + + int idx = 0; + + while (idx < regionLocations.length && regionLocations[idx] == thisServer) { + idx++; + } + + return idx; + } + + void setServices(MasterServices services) { + this.masterServices = services; + } + } + + /** + * Base class of StochasticLoadBalancer's Cost Functions. + */ + public abstract static class CostFunction { + + private float multiplier = 0; + private Configuration conf; + + CostFunction(Configuration c) { + this.conf = c; + } + + float getMultiplier() { + return multiplier; + } + + void setMultiplier(float m) { + this.multiplier = m; + } + + abstract double cost(Cluster cluster); + + /** + * Function to compute a scaled cost using {@link DescriptiveStatistics}. It + * assumes that this is a zero sum set of costs. It assumes that the worst case + * possible is all of the elements in one region server and the rest having 0. + * + * @param stats the costs + * @return a scaled set of costs. + */ + protected double costFromArray(double[] stats) { + double totalCost = 0; + double total = getSum(stats); + double mean = total/((double)stats.length); + double count = stats.length; + + // Compute max as if all region servers had 0 and one had the sum of all costs. This must be + // a zero sum cost for this to make sense. + // TODO: Should we make this sum of square errors? + double max = ((count - 1) * mean) + (total - mean); + for (double n : stats) { + double diff = Math.abs(mean - n); + totalCost += diff; + } + + double scaled = scale(0, max, totalCost); + return scaled; + } + + + + private double getSum(double[] stats) { + double total = 0; + for(double s:stats) { + total += s; + } + return total; + } + + /** + * Scale the value between 0 and 1. + * + * @param min Min value + * @param max The Max value + * @param value The value to be scaled. + * @return The scaled value. + */ + protected double scale(double min, double max, double value) { + if (max == 0 || value == 0) { + return 0; + } + + return Math.max(0d, Math.min(1d, (value - min) / max)); + } + } + /** * Given the starting state of the regions and a potential ending state * compute cost based upon the number of regions that have moved. - * - * @param cluster The state of the cluster - * @return The cost. Between 0 and 1. */ - double computeMoveCost(Cluster cluster) { - double moveCost = cluster.numMovedRegions; + public static class MoveCostFunction extends CostFunction { + private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost"; + private static final String MAX_MOVES_PERCENT_KEY = + "hbase.master.balancer.stochastic.maxMovePercent"; + private static final float DEFAULT_MOVE_COST = 100; + private static final int DEFAULT_MAX_MOVES = 600; + private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f; + private static final int META_MOVE_COST_MULT = 10; - //Don't let this single balance move more than the max moves. - //This allows better scaling to accurately represent the actual cost of a move. - if (moveCost > maxMoves) { - return Double.MAX_VALUE; //return a number much greater than any of the other cost functions + private final float maxMovesPercent; + + MoveCostFunction(Configuration conf) { + super(conf); + + // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure + // that large benefits are need to overcome the cost of a move. + this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST)); + // What percent of the number of regions a single run of the balancer can move. + maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT); } - //META region is special - if (cluster.numMovedMetaRegions > 0) { - maxMoves += 9 * cluster.numMovedMetaRegions; //assume each META region move costs 10 times - } + @Override + double cost(Cluster cluster) { + // Try and size the max number of Moves, but always be prepared to move some. + int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), + DEFAULT_MAX_MOVES); - return scale(0, cluster.numRegions, moveCost); + double moveCost = cluster.numMovedRegions; + + // Don't let this single balance move more than the max moves. + // This allows better scaling to accurately represent the actual cost of a move. + if (moveCost > maxMoves) { + return 1000000; // return a number much greater than any of the other cost + } + + // META region is special + if (cluster.numMovedMetaRegions > 0) { + // assume each META region move costs 10 times + moveCost += META_MOVE_COST_MULT * cluster.numMovedMetaRegions; + } + + return scale(0, cluster.numRegions + META_MOVE_COST_MULT, moveCost); + } } /** * Compute the cost of a potential cluster state from skew in number of - * regions on a cluster - * - * @param cluster The state of the cluster - * @return The cost of region load imbalance. + * regions on a cluster. */ - double computeSkewLoadCost(Cluster cluster) { - DescriptiveStatistics stats = new DescriptiveStatistics(); - for (int[] regions : cluster.regionsPerServer) { - stats.addValue(regions.length); + public static class RegionCountSkewCostFunction extends CostFunction { + private static final String REGION_COUNT_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.regionCountCost"; + private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500; + + private double[] stats = null; + + RegionCountSkewCostFunction(Configuration conf) { + super(conf); + // Load multiplier should be the greatest as it is the most general way to balance data. + this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); + } + + @Override + double cost(Cluster cluster) { + if (stats == null || stats.length != cluster.numServers) { + stats = new double[cluster.numServers]; + } + + for (int i =0; i < cluster.numServers; i++) { + stats[i] = cluster.regionsPerServer[i].length; + } + return costFromArray(stats); } - return costFromStats(stats); } /** * Compute the cost of a potential cluster configuration based upon how evenly * distributed tables are. - * - * @param cluster The state of the cluster - * @return Cost of imbalance in table. */ - double computeTableSkewLoadCost(Cluster cluster) { - double max = cluster.numRegions; - double min = cluster.numRegions / cluster.numServers; - double value = 0; + public static class TableSkewCostFunction extends CostFunction { - for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) { - value += cluster.numMaxRegionsPerTable[i]; + private static final String TABLE_SKEW_COST_KEY = + "hbase.master.balancer.stochastic.tableSkewCost"; + private static final float DEFAULT_TABLE_SKEW_COST = 35; + + TableSkewCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST)); } - return scale(min, max, value); + @Override + double cost(Cluster cluster) { + double max = cluster.numRegions; + double min = cluster.numRegions / cluster.numServers; + double value = 0; + + for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { + value += cluster.numMaxRegionsPerTable[i]; + } + + return scale(min, max, value); + } } + /** * Compute a cost of a potential cluster configuration based upon where * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located. - * - * @param cluster The state of the cluster - * @return A cost between 0 and 1. 0 Means all regions are on the sever with - * the most local store files. */ - double computeDataLocalityCost(Cluster cluster) { + public static class LocalityCostFunction extends CostFunction { - double max = 0; - double cost = 0; + private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost"; + private static final float DEFAULT_LOCALITY_COST = 25; - // If there's no master so there's no way anything else works. - if (this.services == null) return cost; + private MasterServices services; - for (int i = 0; i < cluster.regionLocations.length; i++) { - max += 1; - int serverIndex = cluster.regionIndexToServerIndex[i]; - int[] regionLocations = cluster.regionLocations[i]; + LocalityCostFunction(Configuration conf, MasterServices srv) { + super(conf); + this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST)); + this.services = srv; + } - // If we can't find where the data is getTopBlock returns null. - // so count that as being the best possible. - if (regionLocations == null) { - continue; + void setServices(MasterServices srvc) { + this.services = srvc; + } + + @Override + double cost(Cluster cluster) { + double max = 0; + double cost = 0; + + // If there's no master so there's no way anything else works. + if (this.services == null) { + return cost; } - int index = -1; - for (int j = 0; j < regionLocations.length; j++) { - if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) { - index = j; - break; + for (int i = 0; i < cluster.regionLocations.length; i++) { + max += 1; + int serverIndex = cluster.regionIndexToServerIndex[i]; + int[] regionLocations = cluster.regionLocations[i]; + + // If we can't find where the data is getTopBlock returns null. + // so count that as being the best possible. + if (regionLocations == null) { + continue; + } + + int index = -1; + for (int j = 0; j < regionLocations.length; j++) { + if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) { + index = j; + break; + } + } + + if (index < 0) { + cost += 1; + } else { + cost += (double) index / (double) regionLocations.length; + } + } + return scale(0, max, cost); + } + } + + /** + * Base class the allows writing costs functions from rolling average of some + * number from RegionLoad. + */ + public abstract static class CostFromRegionLoadFunction extends CostFunction { + + private ClusterStatus clusterStatus = null; + private Map> loads = null; + private double[] stats = null; + CostFromRegionLoadFunction(Configuration conf) { + super(conf); + } + + void setClusterStatus(ClusterStatus status) { + this.clusterStatus = status; + } + + void setLoads(Map> l) { + this.loads = l; + } + + + double cost(Cluster cluster) { + if (clusterStatus == null || loads == null) { + return 0; + } + + if (stats == null || stats.length != cluster.numServers) { + stats = new double[cluster.numServers]; + } + + for (int i =0; i < stats.length; i++) { + //Cost this server has from RegionLoad + long cost = 0; + + // for every region on this server get the rl + for(int regionIndex:cluster.regionsPerServer[i]) { + List regionLoadList = cluster.regionLoads[regionIndex]; + + // Now if we found a region load get the type of cost that was requested. + if (regionLoadList != null) { + cost += getRegionLoadCost(regionLoadList); + } + } + + // Add the total cost to the stats. + stats[i] = cost; + } + + // Now return the scaled cost from data held in the stats object. + return costFromArray(stats); + } + + protected double getRegionLoadCost(List regionLoadList) { + double cost = 0; + + for (RegionLoad rl : regionLoadList) { + double toAdd = getCostFromRl(rl); + + if (cost == 0) { + cost = toAdd; + } else { + cost = (.5 * cost) + (.5 * toAdd); } } - if (index < 0) { - cost += 1; - } else { - cost += (double) index / (double) regionLocations.length; - } + return cost; } - return scale(0, max, cost); - } - /** The cost's that can be derived from RegionLoad */ - private enum RegionLoadCostType { - READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE + protected abstract double getCostFromRl(RegionLoad rl); } /** - * Compute the cost of the current cluster state due to some RegionLoadCost type - * - * @param cluster The state of the cluster - * @param costType what type of cost to consider - * @return the scaled cost. + * Compute the cost of total number of read requests The more unbalanced the higher the + * computed cost will be. This uses a rolling average of regionload. */ - private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) { - if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0; + public static class ReadRequestCostFunction extends CostFromRegionLoadFunction { - DescriptiveStatistics stats = new DescriptiveStatistics(); + private static final String READ_REQUEST_COST_KEY = + "hbase.master.balancer.stochastic.readRequestCost"; + private static final float DEFAULT_READ_REQUEST_COST = 5; - for (List rl : cluster.regionLoads) { - long cost = 0; //Cost this server has from RegionLoad - // Now if we found a region load get the type of cost that was requested. - if (rl != null) { - cost += getRegionLoadCost(rl, costType); - } - - // Add the total cost to the stats. - stats.addValue(cost); + ReadRequestCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST)); } - // No return the scaled cost from data held in the stats object. - return costFromStats(stats); + + protected double getCostFromRl(RegionLoad rl) { + return rl.getReadRequestsCount(); + } } /** - * Get the un-scaled cost from a RegionLoad - * - * @param regionLoadList the Region load List - * @param type The type of cost to extract - * @return the double representing the cost + * Compute the cost of total number of write requests. The more unbalanced the higher the + * computed cost will be. This uses a rolling average of regionload. */ - private double getRegionLoadCost(List regionLoadList, RegionLoadCostType type) { - double cost = 0; + public static class WriteRequestCostFunction extends CostFromRegionLoadFunction { - int size = regionLoadList.size(); - for(int i =0; i< size; i++) { - RegionLoad rl = regionLoadList.get(i); - double toAdd = 0; - switch (type) { - case READ_REQUEST: - toAdd = rl.getReadRequestsCount(); - break; - case WRITE_REQUEST: - toAdd = rl.getWriteRequestsCount(); - break; - case MEMSTORE_SIZE: - toAdd = rl.getMemStoreSizeMB(); - break; - case STOREFILE_SIZE: - toAdd = rl.getStorefileSizeMB(); - break; - default: - assert false : "RegionLoad cost type not supported."; - return 0; - } + private static final String WRITE_REQUEST_COST_KEY = + "hbase.master.balancer.stochastic.writeRequestCost"; + private static final float DEFAULT_WRITE_REQUEST_COST = 5; - if (cost == 0) { - cost = toAdd; - } else { - cost = (.5 * cost) + (.5 * toAdd); - } + WriteRequestCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST)); } - return cost; - + protected double getCostFromRl(RegionLoad rl) { + return rl.getWriteRequestsCount(); + } } /** - * Function to compute a scaled cost using {@link DescriptiveStatistics}. It - * assumes that this is a zero sum set of costs. It assumes that the worst case - * possible is all of the elements in one region server and the rest having 0. - * - * @param stats the costs - * @return a scaled set of costs. + * Compute the cost of total memstore size. The more unbalanced the higher the + * computed cost will be. This uses a rolling average of regionload. */ - double costFromStats(DescriptiveStatistics stats) { - double totalCost = 0; - double mean = stats.getMean(); + public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction { - //Compute max as if all region servers had 0 and one had the sum of all costs. This must be - // a zero sum cost for this to make sense. - //TODO: Should we make this sum of square errors? - double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean); - for (double n : stats.getValues()) { - double diff = Math.abs(mean - n); - totalCost += diff; + private static final String MEMSTORE_SIZE_COST_KEY = + "hbase.master.balancer.stochastic.memstoreSizeCost"; + private static final float DEFAULT_MEMSTORE_SIZE_COST = 5; + + MemstoreSizeCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST)); } - return scale(0, max, totalCost); + @Override + protected double getCostFromRl(RegionLoad rl) { + return rl.getMemStoreSizeMB(); + } } - /** - * Scale the value between 0 and 1. - * - * @param min Min value - * @param max The Max value - * @param value The value to be scaled. - * @return The scaled value. + * Compute the cost of total open storefiles size. The more unbalanced the higher the + * computed cost will be. This uses a rolling average of regionload. */ - private double scale(double min, double max, double value) { - if (max == 0 || value == 0) { - return 0; + public static class StoreFileCostFunction extends CostFromRegionLoadFunction { + + private static final String STOREFILE_SIZE_COST_KEY = + "hbase.master.balancer.stochastic.storefileSizeCost"; + private static final float DEFAULT_STOREFILE_SIZE_COST = 5; + + StoreFileCostFunction(Configuration conf) { + super(conf); + this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST)); } - return Math.max(0d, Math.min(1d, (value - min) / max)); + @Override + protected double getCostFromRl(RegionLoad rl) { + return rl.getStorefileSizeMB(); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 9b6559c070d..7a7b51283e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes; */ public class BalancerTestBase { - private static Random rand = new Random(); + protected static Random rand = new Random(); static int regionId = 0; /** @@ -125,7 +125,9 @@ public class BalancerTestBase { * @param plans * @return */ - protected List reconcile(List list, List plans) { + protected List reconcile(List list, + List plans, + Map> servers) { List result = new ArrayList(list.size()); if (plans == null) return result; Map map = new HashMap(list.size()); @@ -134,9 +136,13 @@ public class BalancerTestBase { } for (RegionPlan plan : plans) { ServerName source = plan.getSource(); + updateLoad(map, source, -1); ServerName destination = plan.getDestination(); updateLoad(map, destination, +1); + + servers.get(source).remove(plan.getRegionInfo()); + servers.get(destination).add(plan.getRegionInfo()); } result.clear(); result.addAll(map.values()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java index 46a390b5ea3..8c00abd6ba3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -116,7 +116,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase { List list = convertToList(servers); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans); + List balancedCluster = reconcile(list, plans, servers); LOG.info("Mock Balance : " + printMock(balancedCluster)); assertClusterAsBalanced(balancedCluster); for (Map.Entry> entry : servers.entrySet()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index 78e313712bc..74439b0ffd9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -46,6 +49,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { @BeforeClass public static void beforeAllTests() throws Exception { Configuration conf = HBaseConfiguration.create(); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); loadBalancer = new StochasticLoadBalancer(); loadBalancer.setConf(conf); } @@ -101,6 +105,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10}, new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123}, new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155}, + new int[]{10, 7, 12, 8, 11, 10, 9, 14}, + new int[]{13, 14, 6, 10, 10, 10, 8, 10}, + new int[]{130, 14, 60, 10, 100, 10, 80, 10}, + new int[]{130, 140, 60, 100, 100, 100, 80, 100} }; /** @@ -119,9 +127,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { List list = convertToList(servers); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans); + List balancedCluster = reconcile(list, plans, servers); LOG.info("Mock Balance : " + printMock(balancedCluster)); assertClusterAsBalanced(balancedCluster); + List secondPlans = loadBalancer.balanceCluster(servers); + assertNull(secondPlans); for (Map.Entry> entry : servers.entrySet()) { returnRegions(entry.getValue()); returnServer(entry.getKey()); @@ -132,56 +142,96 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { @Test public void testSkewCost() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { - double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster)); + double cost = costFunction.cost(mockCluster(mockCluster)); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } assertEquals(1, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01); + costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01); assertEquals(.75, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01); + costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01); assertEquals(.5, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01); + costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01); assertEquals(.25, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01); + costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01); assertEquals(0, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01); + costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01); assertEquals(0, - loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01); + costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01); } @Test public void testTableSkewCost() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFunction + costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); - double cost = loadBalancer.computeTableSkewLoadCost(cluster); + double cost = costFunction.cost(cluster); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } } @Test - public void testCostFromStats() { - DescriptiveStatistics statOne = new DescriptiveStatistics(); - for (int i =0; i < 100; i++) { - statOne.addValue(10); - } - assertEquals(0, loadBalancer.costFromStats(statOne), 0.01); + public void testCostFromArray() { + Configuration conf = HBaseConfiguration.create(); + StochasticLoadBalancer.CostFromRegionLoadFunction + costFunction = new StochasticLoadBalancer.MemstoreSizeCostFunction(conf); - DescriptiveStatistics statTwo = new DescriptiveStatistics(); + double[] statOne = new double[100]; for (int i =0; i < 100; i++) { - statTwo.addValue(0); + statOne[i] = 10; } - statTwo.addValue(100); - assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01); + assertEquals(0, costFunction.costFromArray(statOne), 0.01); - DescriptiveStatistics statThree = new DescriptiveStatistics(); + double[] statTwo= new double[101]; for (int i =0; i < 100; i++) { - statThree.addValue(0); - statThree.addValue(100); + statTwo[i] = 0; } - assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01); + statTwo[100] = 100; + assertEquals(1, costFunction.costFromArray(statTwo), 0.01); + + double[] statThree = new double[200]; + for (int i =0; i < 100; i++) { + statThree[i] = (0); + statThree[i+100] = 100; + } + assertEquals(0.5, costFunction.costFromArray(statThree), 0.01); + } + + @Test(timeout = 30000) + public void testLosingRs() throws Exception { + int numNodes = 3; + int numRegions = 20; + int numRegionsPerServer = 3; //all servers except one + int numTables = 2; + + Map> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + List list = convertToList(serverMap); + + + List plans = loadBalancer.balanceCluster(serverMap); + assertNotNull(plans); + + // Apply the plan to the mock cluster. + List balancedCluster = reconcile(list, plans, serverMap); + + assertClusterAsBalanced(balancedCluster); + + ServerName sn = serverMap.keySet().toArray(new ServerName[serverMap.size()])[0]; + + ServerName deadSn = new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() -100); + + serverMap.put(deadSn, new ArrayList(0)); + + plans = loadBalancer.balanceCluster(serverMap); + assertNull(plans); } @Test (timeout = 20000) @@ -190,7 +240,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numRegions = 1000; int numRegionsPerServer = 40; //all servers except one int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); } @Test (timeout = 20000) @@ -199,45 +249,92 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { int numRegions = 2000; int numRegionsPerServer = 40; //all servers except one int numTables = 10; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); } - @Test (timeout = 40000) + @Test (timeout = 20000) + public void testSmallCluster3() { + int numNodes = 20; + int numRegions = 2000; + int numRegionsPerServer = 1; // all servers except one + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */); + } + + @Test (timeout = 800000) public void testMidCluster() { int numNodes = 100; int numRegions = 10000; - int numRegionsPerServer = 60; //all servers except one + int numRegionsPerServer = 60; // all servers except one int numTables = 40; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); } - @Test (timeout = 1200000) + @Test (timeout = 800000) public void testMidCluster2() { int numNodes = 200; int numRegions = 100000; - int numRegionsPerServer = 40; //all servers except one + int numRegionsPerServer = 40; // all servers except one int numTables = 400; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + testWithCluster(numNodes, + numRegions, + numRegionsPerServer, + numTables, + false /* num large num regions means may not always get to best balance with one run */); + } + + + @Test (timeout = 800000) + public void testMidCluster3() { + int numNodes = 100; + int numRegions = 2000; + int numRegionsPerServer = 9; // all servers except one + int numTables = 110; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); + // TODO(eclark): Make sure that the tables are well distributed. } @Test - @Ignore - //TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this. - //There are two reasons so far; - // - It takes too long for iterating for all servers - // - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost - // but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down - // the total cost, so that the eager selection cannot continue. This can be solved by smt like - // http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection public void testLargeCluster() { int numNodes = 1000; int numRegions = 100000; //100 regions per RS int numRegionsPerServer = 80; //all servers except one int numTables = 100; - testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); } - protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) { + protected void testWithCluster(int numNodes, + int numRegions, + int numRegionsPerServer, + int numTables, + boolean assertFullyBalanced) { + Map> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); + + List list = convertToList(serverMap); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + + // Run the balancer. + List plans = loadBalancer.balanceCluster(serverMap); + assertNotNull(plans); + + // Check to see that this actually got to a stable place. + if (assertFullyBalanced) { + // Apply the plan to the mock cluster. + List balancedCluster = reconcile(list, plans, serverMap); + + // Print out the cluster loads to make debugging easier. + LOG.info("Mock Balance : " + printMock(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + List secondPlans = loadBalancer.balanceCluster(serverMap); + assertNull(secondPlans); + } + } + + private Map> createServerMap(int numNodes, + int numRegions, + int numRegionsPerServer, + int numTables) { //construct a cluster of numNodes, having a total of numRegions. Each RS will hold //numRegionsPerServer many regions except for the last one, which will host all the //remaining regions @@ -246,8 +343,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { cluster[i] = numRegionsPerServer; } cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); - - assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables))); + return mockClusterServers(cluster, numTables); } - }