From e21a1ee4ae26f273e6730d009280b2e285b0bab3 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Thu, 11 Apr 2013 21:59:31 +0000 Subject: [PATCH] HBASE-8119 Optimize StochasticLoadBalancer git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467109 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/HRegionInfo.java | 33 +- .../apache/hadoop/hbase/master/HMaster.java | 8 +- .../master/balancer/BaseLoadBalancer.java | 211 ++++++++++ .../balancer/StochasticLoadBalancer.java | 394 +++++++----------- .../master/balancer/BalancerTestBase.java | 18 +- .../balancer/TestStochasticLoadBalancer.java | 86 +++- 6 files changed, 469 insertions(+), 281 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java index 84d4be25af8..71b3448240d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -18,8 +18,17 @@ */ package org.apache.hadoop.hbase; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.SequenceInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,16 +46,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.io.DataInputBuffer; -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.IOException; -import java.io.SequenceInputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; /** * HRegion information. @@ -182,6 +183,7 @@ public class HRegionInfo implements Comparable { // Current TableName private byte[] tableName = null; + private String tableNameAsString = null; /** HRegionInfo for root region */ public static final HRegionInfo ROOT_REGIONINFO = @@ -532,7 +534,10 @@ public class HRegionInfo implements Comparable { * @return string representation of current table */ public String getTableNameAsString() { - return Bytes.toString(tableName); + if (tableNameAsString == null) { + tableNameAsString = Bytes.toString(tableName); + } + return tableNameAsString; } /** @@ -684,7 +689,7 @@ public class HRegionInfo implements Comparable { } /** - * @deprecated Use protobuf deserialization instead. + * @deprecated Use protobuf deserialization instead. * @see #parseFrom(byte[]) */ @Deprecated diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 865d6b0fde5..ddc2a7b3383 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1241,10 +1241,10 @@ Server { int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1); if (balancerCutoffTime == -1) { - // No time period set so create one -- do half of balancer period. + // No time period set so create one int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000); - balancerCutoffTime = balancerPeriod / 2; + balancerCutoffTime = balancerPeriod; // If nonsense period, set it to balancerPeriod if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod; } @@ -1261,7 +1261,6 @@ Server { if (!this.loadBalancerTracker.isBalancerOn()) return false; // Do this call outside of synchronized block. int maximumBalanceTime = getBalancerCutoffTime(); - long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; boolean balancerRan; synchronized (this.balancer) { // Only allow one balance run at at time. @@ -1296,6 +1295,7 @@ Server { List partialPlans = this.balancer.balanceCluster(assignments); if (partialPlans != null) plans.addAll(partialPlans); } + long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; int rpCount = 0; // number of RegionPlans balanced so far long totalRegPlanExecTime = 0; balancerRan = plans != null; @@ -1303,12 +1303,14 @@ Server { for (RegionPlan plan: plans) { LOG.info("balance " + plan); long balStartTime = System.currentTimeMillis(); + //TODO: bulk assign this.assignmentManager.balance(plan); totalRegPlanExecTime += System.currentTimeMillis()-balStartTime; rpCount++; if (rpCount < plans.size() && // if performing next balance exceeds cutoff time, exit the loop (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { + //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now) LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + maximumBalanceTime); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index e40322fb90f..50c774f0cac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeMap; @@ -29,10 +31,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Joiner; import com.google.common.collect.ArrayListMultimap; @@ -46,6 +50,213 @@ import com.google.common.collect.Sets; */ public abstract class BaseLoadBalancer implements LoadBalancer { + /** + * An efficient array based implementation similar to ClusterState for keeping + * the status of the cluster in terms of region assignment and distribution. + * To be used by LoadBalancers. + */ + protected static class Cluster { + ServerName[] servers; + ArrayList tables; + HRegionInfo[] regions; + List[] regionLoads; + int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality + + int[][] regionsPerServer; //serverIndex -> region list + int[] regionIndexToServerIndex; //regionIndex -> serverIndex + int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) + int[] regionIndexToTableIndex; //regionIndex -> tableIndex + int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions + int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS + + Map serversToIndex; + Map tablesToIndex; + + int numRegions; + int numServers; + int numTables; + + int numMovedRegions = 0; //num moved regions from the initial configuration + int numMovedMetaRegions = 0; //num of moved regions that are META + + protected Cluster(Map> clusterState, Map> loads, + RegionLocationFinder regionFinder) { + serversToIndex = new HashMap(clusterState.size()); + tablesToIndex = new HashMap(); + //regionsToIndex = new HashMap(); + + //TODO: We should get the list of tables from master + tables = new ArrayList(); + + numServers = clusterState.size(); + numRegions = 0; + + for (Entry> entry : clusterState.entrySet()) { + numRegions += entry.getValue().size(); + } + + regionsPerServer = new int[clusterState.size()][]; + servers = new ServerName[numServers]; + regions = new HRegionInfo[numRegions]; + regionIndexToServerIndex = new int[numRegions]; + initialRegionIndexToServerIndex = new int[numRegions]; + regionIndexToTableIndex = new int[numRegions]; + regionLoads = new List[numRegions]; + regionLocations = new int[numRegions][]; + + int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0; + for (Entry> entry : clusterState.entrySet()) { + servers[serverIndex] = entry.getKey(); + regionsPerServer[serverIndex] = new int[entry.getValue().size()]; + serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex)); + regionPerServerIndex = 0; + for (HRegionInfo region : entry.getValue()) { + byte[] tableName = region.getTableName(); + int tableHash = Bytes.mapKey(tableName); + Integer idx = tablesToIndex.get(tableHash); + if (idx == null) { + tables.add(tableName); + idx = tableIndex; + tablesToIndex.put(tableHash, tableIndex++); + } + + regions[regionIndex] = region; + regionIndexToServerIndex[regionIndex] = serverIndex; + initialRegionIndexToServerIndex[regionIndex] = serverIndex; + regionIndexToTableIndex[regionIndex] = idx; + regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; + + //region load + if (loads != null) { + List rl = loads.get(region.getRegionNameAsString()); + // That could have failed if the RegionLoad is using the other regionName + if (rl == null) { + // Try getting the region load using encoded name. + rl = loads.get(region.getEncodedName()); + } + regionLoads[regionIndex] = rl; + } + + if (regionFinder != null) { + //region location + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i=0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = serversToIndex.get(loc.get(i)); + } + } + + regionIndex++; + } + serverIndex++; + } + + numTables = tables.size(); + numRegionsPerServerPerTable = new int[numServers][numTables]; + + for (int i = 0; i < numServers; i++) { + for (int j = 0; j < numTables; j++) { + numRegionsPerServerPerTable[i][j] = 0; + } + } + + for (int i=0; i < regionIndexToServerIndex.length; i++) { + numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + } + + numMaxRegionsPerTable = new int[numTables]; + for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { + for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) { + if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex]; + } + } + } + } + + public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) { + //swap + if (rRegion >= 0 && lRegion >= 0) { + regionMoved(rRegion, rServer, lServer); + regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion); + regionMoved(lRegion, lServer, rServer); + regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion); + } else if (rRegion >= 0) { //move rRegion + regionMoved(rRegion, rServer, lServer); + regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion); + regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion); + } else if (lRegion >= 0) { //move lRegion + regionMoved(lRegion, lServer, rServer); + regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion); + regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion); + } + } + + /** Region moved out of the server */ + void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) { + regionIndexToServerIndex[regionIndex] = newServerIndex; + if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) { + numMovedRegions--; //region moved back to original location + if (regions[regionIndex].isMetaRegion()) { + numMovedMetaRegions--; + } + } else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) { + numMovedRegions++; //region moved from original location + if (regions[regionIndex].isMetaRegion()) { + numMovedMetaRegions++; + } + } + int tableIndex = regionIndexToTableIndex[regionIndex]; + numRegionsPerServerPerTable[oldServerIndex][tableIndex]--; + numRegionsPerServerPerTable[newServerIndex][tableIndex]++; + + //check whether this caused maxRegionsPerTable in the new Server to be updated + if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex]; + } else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1) + == numMaxRegionsPerTable[tableIndex]) { + //recompute maxRegionsPerTable since the previous value was coming from the old server + for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) { + if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex]; + } + } + } + } + + int[] removeRegion(int[] regions, int regionIndex) { + //TODO: this maybe costly. Consider using linked lists + int[] newRegions = new int[regions.length - 1]; + int i = 0; + for (i = 0; i < regions.length; i++) { + if (regions[i] == regionIndex) { + break; + } + newRegions[i] = regions[i]; + } + System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); + return newRegions; + } + + int[] addRegion(int[] regions, int regionIndex) { + int[] newRegions = new int[regions.length + 1]; + System.arraycopy(regions, 0, newRegions, 0, regions.length); + newRegions[newRegions.length - 1] = regionIndex; + return newRegions; + } + + int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { + int i = 0; + for (i = 0; i < regions.length; i++) { + if (regions[i] == regionIndex) { + regions[i] = newRegionIndex; + break; + } + } + return regions; + } + } + // slop for regions private float slop; private Configuration config; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 4393953abbe..d6e79cc7070 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -17,21 +17,6 @@ */ package org.apache.hadoop.hbase.master.balancer; -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.RegionLoad; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.util.Bytes; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -40,6 +25,21 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + /** *

This is a best effort load balancer. Given a Cost function F(C) => x It will * randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the @@ -104,6 +104,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "hbase.master.balancer.stochastic.stepsPerRegion"; private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions"; + private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime"; private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; private static final Random RANDOM = new Random(System.currentTimeMillis()); @@ -115,10 +116,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // values are defaults private int maxSteps = 15000; private int stepsPerRegion = 110; + private long maxRunningTime = 1 * 60 * 1000; //5 min private int maxMoves = 600; private int numRegionLoadsToRemember = 15; - private float loadMultiplier = 55; - private float moveCostMultiplier = 5; + private float loadMultiplier = 100; + private float moveCostMultiplier = 1; private float tableMultiplier = 5; private float localityMultiplier = 5; private float readRequestMultiplier = 0; @@ -135,6 +137,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); + maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime); numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); @@ -183,86 +186,75 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return null; } - long startTime = System.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Keep track of servers to iterate through them. - List servers = new ArrayList(clusterState.keySet()); - Map initialRegionMapping = createRegionMapping(clusterState); double currentCost, newCost, initCost; - currentCost = newCost = initCost = computeCost(initialRegionMapping, clusterState); + + Cluster cluster = new Cluster(clusterState, loads, regionFinder); + currentCost = newCost = initCost = computeCost(cluster); int computedMaxSteps = - Math.min(this.maxSteps, (initialRegionMapping.size() * this.stepsPerRegion)); + Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion)); // Perform a stochastic walk to see if we can get a good fit. - for (int step = 0; step < computedMaxSteps; step++) { + int step; + for (step = 0; step < computedMaxSteps; step++) { // try and perform a mutation - for (ServerName leftServer : servers) { + for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) { // What server are we going to be swapping regions with ? - ServerName rightServer = pickOtherServer(leftServer, servers); - if (rightServer == null) { + int rightServer = pickOtherServer(leftServer, cluster); + if (rightServer < 0) { continue; } - // Get the regions. - List leftRegionList = clusterState.get(leftServer); - List rightRegionList = clusterState.get(rightServer); - // Pick what regions to swap around. // If we get a null for one then this isn't a swap just a move - HRegionInfo lRegion = pickRandomRegion(leftRegionList, 0); - HRegionInfo rRegion = pickRandomRegion(rightRegionList, 0.5); + int lRegion = pickRandomRegion(cluster, leftServer, 0); + int rRegion = pickRandomRegion(cluster, rightServer, 0.5); // We randomly picked to do nothing. - if (lRegion == null && rRegion == null) { + if (lRegion < 0 && rRegion < 0) { continue; } - if (rRegion != null) { - leftRegionList.add(rRegion); - } - - if (lRegion != null) { - rightRegionList.add(lRegion); - } - - newCost = computeCost(initialRegionMapping, clusterState); + cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion); + newCost = computeCost(cluster); // Should this be kept? if (newCost < currentCost) { currentCost = newCost; } else { // Put things back the way they were before. - if (rRegion != null) { - leftRegionList.remove(rRegion); - rightRegionList.add(rRegion); - } - - if (lRegion != null) { - rightRegionList.remove(lRegion); - leftRegionList.add(lRegion); - } + //TODO: undo by remembering old values, using an UndoAction class + cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion); } } - + if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) { + break; + } } - long endTime = System.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTimeMillis(); if (initCost > currentCost) { - List plans = createRegionPlans(initialRegionMapping, clusterState); + List plans = createRegionPlans(cluster); - LOG.debug("Finished computing new laod balance plan. Computation took " - + (endTime - startTime) + "ms to try " + computedMaxSteps - + " different iterations. Found a solution that moves " + plans.size() - + " regions; Going from a computed cost of " + initCost + " to a new cost of " - + currentCost); + if (LOG.isDebugEnabled()) { + LOG.debug("Finished computing new laod balance plan. Computation took " + + (endTime - startTime) + "ms to try " + step + + " different iterations. Found a solution that moves " + plans.size() + + " regions; Going from a computed cost of " + initCost + " to a new cost of " + + currentCost); + } return plans; } - LOG.debug("Could not find a better load balance plan. Tried " + computedMaxSteps - + " different configurations in " + (endTime - startTime) - + "ms, and did not find anything with a computed cost less than " + initCost); + if (LOG.isDebugEnabled()) { + LOG.debug("Could not find a better load balance plan. Tried " + step + + " different configurations in " + (endTime - startTime) + + "ms, and did not find anything with a computed cost less than " + initCost); + } return null; } @@ -274,46 +266,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param clusterState The desired mapping of ServerName to Regions * @return List of RegionPlan's that represent the moves needed to get to desired final state. */ - private List createRegionPlans(Map initialRegionMapping, - Map> clusterState) { + private List createRegionPlans(Cluster cluster) { List plans = new LinkedList(); - for (Entry> entry : clusterState.entrySet()) { - ServerName newServer = entry.getKey(); - - for (HRegionInfo region : entry.getValue()) { - ServerName initialServer = initialRegionMapping.get(region); - if (!newServer.equals(initialServer)) { + for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { + int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; + int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; + if (initialServerIndex != newServerIndex) { + HRegionInfo region = cluster.regions[regionIndex]; + ServerName initialServer = cluster.servers[initialServerIndex]; + ServerName newServer = cluster.servers[newServerIndex]; + if (LOG.isTraceEnabled()) { LOG.trace("Moving Region " + region.getEncodedName() + " from server " + initialServer.getHostname() + " to " + newServer.getHostname()); - RegionPlan rp = new RegionPlan(region, initialServer, newServer); - plans.add(rp); } + RegionPlan rp = new RegionPlan(region, initialServer, newServer); + plans.add(rp); } } return plans; } - /** - * Create a map that will represent the initial location of regions on a - * {@link ServerName} - * - * @param clusterState starting state of the cluster and regions. - * @return A map of {@link HRegionInfo} to the {@link ServerName} that is - * currently hosting that region - */ - private Map createRegionMapping( - Map> clusterState) { - Map mapping = new HashMap(); - - for (Entry> entry : clusterState.entrySet()) { - for (HRegionInfo region : entry.getValue()) { - mapping.put(region, entry.getKey()); - } - } - return mapping; - } - /** Store the current region loads. */ private synchronized void updateRegionLoad() { @@ -358,32 +331,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @return a random {@link HRegionInfo} or null if an asymmetrical move is * suggested. */ - private HRegionInfo pickRandomRegion(List regions, double chanceOfNoSwap) { - + private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) { //Check to see if this is just a move. - if (regions.isEmpty() || RANDOM.nextFloat() < chanceOfNoSwap) { + if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) { //signal a move only. - return null; + return -1; } + int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length); + return cluster.regionsPerServer[server][rand]; - int count = 0; - HRegionInfo r = null; - - //We will try and find a region up to 10 times. If we always - while (count < 10 && r == null ) { - count++; - r = regions.get(RANDOM.nextInt(regions.size())); - - // If this is a special region we always try not to move it. - // so clear out r. try again - if (r.isMetaRegion()) { - r = null; - } - } - if (r != null) { - regions.remove(r); - } - return r; } /** @@ -394,16 +350,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param allServers list of all server from which to pick * @return random server. Null if no other servers were found. */ - private ServerName pickOtherServer(ServerName server, List allServers) { - ServerName s = null; - int count = 0; - while (count < 100 && (s == null || ServerName.isSameHostnameAndPort(s, server))) { - count++; - s = allServers.get(RANDOM.nextInt(allServers.size())); + private int pickOtherServer(int serverIndex, Cluster cluster) { + if (cluster.numServers < 2) { + return -1; + } + while (true) { + int otherServerIndex = RANDOM.nextInt(cluster.numServers); + if (otherServerIndex != serverIndex) { + return otherServerIndex; + } } - - // If nothing but the current server was found return null. - return (s == null || ServerName.isSameHostnameAndPort(s, server)) ? null : s; } /** @@ -414,38 +370,39 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param clusterState Map of ServerName to list of regions. * @return a double of a cost associated with the proposed */ - protected double computeCost(Map initialRegionMapping, - Map> clusterState) { + protected double computeCost(Cluster cluster) { - double moveCost = moveCostMultiplier * computeMoveCost(initialRegionMapping, clusterState); + double moveCost = moveCostMultiplier * computeMoveCost(cluster); - double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(clusterState); - double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(clusterState); + double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(cluster); + double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(cluster); double localityCost = - localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState); + localityMultiplier * computeDataLocalityCost(cluster); double memstoreSizeCost = memStoreSizeMultiplier - * computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE); + * computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE); double storefileSizeCost = storeFileSizeMultiplier - * computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE); + * computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE); double readRequestCost = readRequestMultiplier - * computeRegionLoadCost(clusterState, RegionLoadCostType.READ_REQUEST); + * computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST); double writeRequestCost = writeRequestMultiplier - * computeRegionLoadCost(clusterState, RegionLoadCostType.WRITE_REQUEST); + * computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST); - double total = + double total = moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost + storefileSizeCost + readRequestCost + writeRequestCost; - LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = " - + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = " - + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = " - + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost); + if (LOG.isTraceEnabled()) { + LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = " + + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = " + + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = " + + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost); + } return total; } @@ -457,24 +414,21 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param clusterState The potential new cluster state. * @return The cost. Between 0 and 1. */ - double computeMoveCost(Map initialRegionMapping, - Map> clusterState) { - float moveCost = 0; - for (Entry> entry : clusterState.entrySet()) { - for (HRegionInfo region : entry.getValue()) { - if (initialRegionMapping.get(region) != entry.getKey()) { - moveCost += 1; - } - } - } + double computeMoveCost(Cluster cluster) { + double moveCost = cluster.numMovedRegions; //Don't let this single balance move more than the max moves. //This allows better scaling to accurately represent the actual cost of a move. if (moveCost > maxMoves) { - return 10000; //return a number much greater than any of the other cost functions + return Double.MAX_VALUE; //return a number much greater than any of the other cost functions } - return scale(0, Math.min(maxMoves, initialRegionMapping.size()), moveCost); + //META region is special + if (cluster.numMovedMetaRegions > 0) { + maxMoves += 9 * cluster.numMovedMetaRegions; //assume each META region move costs 10 times + } + + return scale(0, cluster.numRegions, moveCost); } /** @@ -484,11 +438,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param clusterState The proposed cluster state * @return The cost of region load imbalance. */ - double computeSkewLoadCost(Map> clusterState) { + double computeSkewLoadCost(Cluster cluster) { DescriptiveStatistics stats = new DescriptiveStatistics(); - for (List regions : clusterState.values()) { - int size = regions.size(); - stats.addValue(size); + for (int[] regions : cluster.regionsPerServer) { + stats.addValue(regions.length); } return costFromStats(stats); } @@ -500,68 +453,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param clusterState Proposed cluster state. * @return Cost of imbalance in table. */ - double computeTableSkewLoadCost(Map> clusterState) { - - Map tableRegionsTotal = new HashMap(); - Map tableRegionsOnCurrentServer = new HashMap(); - Map tableCostSeenSoFar = new HashMap(); - // Go through everything per server - for (Entry> entry : clusterState.entrySet()) { - tableRegionsOnCurrentServer.clear(); - - // For all of the regions count how many are from each table - for (HRegionInfo region : entry.getValue()) { - String tableName = region.getTableNameAsString(); - - // See if this table already has a count on this server - MutableInt regionsOnServerCount = tableRegionsOnCurrentServer.get(tableName); - - // If this is the first time we've seen this table on this server - // create a new mutable int. - if (regionsOnServerCount == null) { - regionsOnServerCount = new MutableInt(0); - tableRegionsOnCurrentServer.put(tableName, regionsOnServerCount); - } - - // Increment the count of how many regions from this table are host on - // this server - regionsOnServerCount.increment(); - - // Now count the number of regions in this table. - MutableInt totalCount = tableRegionsTotal.get(tableName); - - // If this is the first region from this table create a new counter for - // this table. - if (totalCount == null) { - totalCount = new MutableInt(0); - tableRegionsTotal.put(tableName, totalCount); - } - totalCount.increment(); - } - - // Now go through all of the tables we have seen and keep the max number - // of regions of this table a single region server is hosting. - for (Entry currentServerEntry: tableRegionsOnCurrentServer.entrySet()) { - String tableName = currentServerEntry.getKey(); - Integer thisCount = currentServerEntry.getValue().toInteger(); - Integer maxCountSoFar = tableCostSeenSoFar.get(tableName); - - if (maxCountSoFar == null || thisCount.compareTo(maxCountSoFar) > 0) { - tableCostSeenSoFar.put(tableName, thisCount); - } - } - } - - double max = 0; - double min = 0; + double computeTableSkewLoadCost(Cluster cluster) { + double max = cluster.numRegions; + double min = cluster.numRegions / cluster.numServers; double value = 0; - // Compute the min, value, and max. - for (Entry currentEntry : tableRegionsTotal.entrySet()) { - max += tableRegionsTotal.get(currentEntry.getKey()).doubleValue(); - min += tableRegionsTotal.get(currentEntry.getKey()).doubleValue() / clusterState.size(); - value += tableCostSeenSoFar.get(currentEntry.getKey()).doubleValue(); + for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) { + value += cluster.numMaxRegionsPerTable[i]; } + return scale(min, max, value); } @@ -574,8 +474,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @return A cost between 0 and 1. 0 Means all regions are on the sever with * the most local store files. */ - double computeDataLocalityCost(Map initialRegionMapping, - Map> clusterState) { + double computeDataLocalityCost(Cluster cluster) { double max = 0; double cost = 0; @@ -583,27 +482,29 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { // If there's no master so there's no way anything else works. if (this.services == null) return cost; - for (Entry> entry : clusterState.entrySet()) { - ServerName sn = entry.getKey(); - for (HRegionInfo region : entry.getValue()) { + for (int i = 0; i < cluster.regionLocations.length; i++) { + max += 1; + int serverIndex = cluster.regionIndexToServerIndex[i]; + int[] regionLocations = cluster.regionLocations[i]; - max += 1; + // If we can't find where the data is getTopBlock returns null. + // so count that as being the best possible. + if (regionLocations == null) { + continue; + } - List dataOnServers = regionFinder.getTopBlockLocations(region); - - // If we can't find where the data is getTopBlock returns null. - // so count that as being the best possible. - if (dataOnServers == null) { - continue; - } - - int index = dataOnServers.indexOf(sn); - if (index < 0) { - cost += 1; - } else { - cost += (double) index / (double) dataOnServers.size(); + int index = -1; + for (int j = 0; j < regionLocations.length; j++) { + if (regionLocations[j] == serverIndex) { + index = j; + break; } + } + if (index < 0) { + cost += 1; + } else { + cost += (double) index / (double) regionLocations.length; } } return scale(0, max, cost); @@ -621,31 +522,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { * @param costType what type of cost to consider * @return the scaled cost. */ - private double computeRegionLoadCost(Map> clusterState, - RegionLoadCostType costType) { + private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) { if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0; DescriptiveStatistics stats = new DescriptiveStatistics(); - // For every server look at the cost of each region - for (List regions : clusterState.values()) { + for (List rl : cluster.regionLoads) { long cost = 0; //Cost this server has from RegionLoad - - // For each region - for (HRegionInfo region : regions) { - // Try and get the region using the regionNameAsString - List rl = loads.get(region.getRegionNameAsString()); - - // That could have failed if the RegionLoad is using the other regionName - if (rl == null) { - // Try getting the region load using encoded name. - rl = loads.get(region.getEncodedName()); - } // Now if we found a region load get the type of cost that was requested. - if (rl != null) { - cost += getRegionLoadCost(rl, costType); - } + if (rl != null) { + cost += getRegionLoadCost(rl, costType); } // Add the total cost to the stats. @@ -713,10 +600,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { //Compute max as if all region servers had 0 and one had the sum of all costs. This must be // a zero sum cost for this to make sense. - double max = ((stats.getN() - 1) * stats.getMean()) + (stats.getSum() - stats.getMean()); + //TODO: Should we make this sum of square errors? + double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean); for (double n : stats.getValues()) { - totalCost += Math.abs(mean - n); - + double diff = Math.abs(mean - n); + totalCost += diff; } return scale(0, max, totalCost); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index b726db37633..9b6559c070d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.master.balancer; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -154,12 +153,20 @@ public class BalancerTestBase { } protected Map> mockClusterServers(int[] mockCluster) { + return mockClusterServers(mockCluster, -1); + } + + protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { + return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null); + } + + protected Map> mockClusterServers(int[] mockCluster, int numTables) { int numServers = mockCluster.length; Map> servers = new TreeMap>(); for (int i = 0; i < numServers; i++) { int numRegions = mockCluster[i]; ServerAndLoad sal = randomServer(0); - List regions = randomRegions(numRegions); + List regions = randomRegions(numRegions, numTables); servers.put(sal.getServerName(), regions); } return servers; @@ -168,6 +175,10 @@ public class BalancerTestBase { private Queue regionQueue = new LinkedList(); protected List randomRegions(int numRegions) { + return randomRegions(numRegions, -1); + } + + protected List randomRegions(int numRegions, int numTables) { List regions = new ArrayList(numRegions); byte[] start = new byte[16]; byte[] end = new byte[16]; @@ -180,7 +191,8 @@ public class BalancerTestBase { } Bytes.putInt(start, 0, numRegions << 1); Bytes.putInt(end, 0, (numRegions << 1) + 1); - HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table" + i), start, end, false, regionId++); + byte[] tableName = Bytes.toBytes("table" + (numTables > 0 ? rand.nextInt(numTables) : i)); + HRegionInfo hri = new HRegionInfo(tableName, start, end, false, regionId++); regions.add(hri); } return regions; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index cfb434a31fc..78e313712bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.List; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.RegionPlan; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -131,28 +133,29 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { @Test public void testSkewCost() { for (int[] mockCluster : clusterStateMocks) { - double cost = loadBalancer.computeSkewLoadCost(mockClusterServers(mockCluster)); + double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster)); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } assertEquals(1, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 0, 1 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01); assertEquals(.75, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 1, 1 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01); assertEquals(.5, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 1, 1, 1 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01); assertEquals(.25, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 1, 1, 1, 1 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01); assertEquals(0, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 1, 1, 1, 1, 1 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01); assertEquals(0, - loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 10, 10, 10, 10, 10 })), 0.01); + loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01); } @Test public void testTableSkewCost() { for (int[] mockCluster : clusterStateMocks) { - double cost = loadBalancer.computeTableSkewLoadCost(mockClusterServers(mockCluster)); + BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + double cost = loadBalancer.computeTableSkewLoadCost(cluster); assertTrue(cost >= 0); assertTrue(cost <= 1.01); } @@ -180,4 +183,71 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { } assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01); } + + @Test (timeout = 20000) + public void testSmallCluster() { + int numNodes = 10; + int numRegions = 1000; + int numRegionsPerServer = 40; //all servers except one + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + } + + @Test (timeout = 20000) + public void testSmallCluster2() { + int numNodes = 20; + int numRegions = 2000; + int numRegionsPerServer = 40; //all servers except one + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + } + + @Test (timeout = 40000) + public void testMidCluster() { + int numNodes = 100; + int numRegions = 10000; + int numRegionsPerServer = 60; //all servers except one + int numTables = 40; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + } + + @Test (timeout = 1200000) + public void testMidCluster2() { + int numNodes = 200; + int numRegions = 100000; + int numRegionsPerServer = 40; //all servers except one + int numTables = 400; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + } + + @Test + @Ignore + //TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this. + //There are two reasons so far; + // - It takes too long for iterating for all servers + // - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost + // but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down + // the total cost, so that the eager selection cannot continue. This can be solved by smt like + // http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection + public void testLargeCluster() { + int numNodes = 1000; + int numRegions = 100000; //100 regions per RS + int numRegionsPerServer = 80; //all servers except one + int numTables = 100; + testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables); + } + + protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) { + //construct a cluster of numNodes, having a total of numRegions. Each RS will hold + //numRegionsPerServer many regions except for the last one, which will host all the + //remaining regions + int[] cluster = new int[numNodes]; + for (int i =0; i < numNodes; i++) { + cluster[i] = numRegionsPerServer; + } + cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); + + assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables))); + } + }