From b2086873a95b6916d66c1c6734fa0e130c5aff74 Mon Sep 17 00:00:00 2001 From: Charlie Qiangeng Xu Date: Thu, 1 Dec 2016 11:56:49 +0800 Subject: [PATCH] HBASE-17110 Improve SimpleLoadBalancer to always take server-level balance into account Signed-off-by: Yu Li --- .../rsgroup/RSGroupBasedLoadBalancer.java | 10 + .../apache/hadoop/hbase/master/HMaster.java | 4 + .../hadoop/hbase/master/LoadBalancer.java | 5 + .../hadoop/hbase/master/RegionStates.java | 68 ++--- .../master/balancer/BaseLoadBalancer.java | 11 + .../master/balancer/SimpleLoadBalancer.java | 241 +++++++++++++++--- .../master/balancer/BalancerTestBase.java | 89 +++++++ .../balancer/TestDefaultLoadBalancer.java | 78 +++++- 8 files changed, 424 insertions(+), 82 deletions(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index c42c46d108e..b83a3081fb9 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -114,6 +115,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc this.masterServices = masterServices; } + @Override + public void setClusterLoad(Map>> clusterLoad){ + + } + @Override public List balanceCluster(TableName tableName, Map> clusterState) throws HBaseIOException { @@ -139,6 +145,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) { Map> groupClusterState = new HashMap>(); + Map>> groupClusterLoad = + new HashMap>>(); for (HostAndPort sName : info.getServers()) { for(ServerName curr: clusterState.keySet()) { if(curr.getHostPort().equals(sName)) { @@ -146,6 +154,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc } } } + groupClusterLoad.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), groupClusterState); + this.internalBalancer.setClusterLoad(groupClusterLoad); List groupPlans = this.internalBalancer .balanceCluster(groupClusterState); if (groupPlans != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7be128245b6..5f2e2a60827 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; @@ -1237,6 +1238,9 @@ public class HMaster extends HRegionServer implements MasterServices { //Give the balancer the current cluster state. this.balancer.setClusterStatus(getClusterStatus()); + this.balancer.setClusterLoad( + this.assignmentManager.getRegionStates().getAssignmentsByTable(true)); + for (Entry>> e : assignmentsByTable.entrySet()) { List partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue()); if (partialPlans != null) plans.addAll(partialPlans); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index d7111c3a8bb..1472a919dac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -57,6 +57,11 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse */ void setClusterStatus(ClusterStatus st); + /** + * Pass RegionStates and allow balancer to set the current cluster load. + * @param ClusterLoad + */ + void setClusterLoad(Map>> ClusterLoad); /** * Set the master service. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index fbc5c680de8..39932858b6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -990,50 +990,27 @@ public class RegionStates { (double)totalLoad / (double)numServers; } + protected Map>> getAssignmentsByTable() { + return getAssignmentsByTable(false); + } + /** * This is an EXPENSIVE clone. Cloning though is the safest thing to do. * Can't let out original since it can change and at least the load balancer * wants to iterate this exported list. We need to synchronize on regions * since all access to this.servers is under a lock on this.regions. - * + * @param forceByCluster a flag to force to aggregate the server-load to the cluster level * @return A clone of current assignments by table. */ - protected Map>> - getAssignmentsByTable() { - Map>> result = - new HashMap>>(); + protected Map>> getAssignmentsByTable( + boolean forceByCluster) { + Map>> result; synchronized (this) { - if (!server.getConfiguration().getBoolean( - HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) { - Map> svrToRegions = - new HashMap>(serverHoldings.size()); - for (Map.Entry> e: serverHoldings.entrySet()) { - svrToRegions.put(e.getKey(), new ArrayList(e.getValue())); - } - result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions); - } else { - for (Map.Entry> e: serverHoldings.entrySet()) { - for (HRegionInfo hri: e.getValue()) { - if (hri.isMetaRegion()) continue; - TableName tablename = hri.getTable(); - Map> svrToRegions = result.get(tablename); - if (svrToRegions == null) { - svrToRegions = new HashMap>(serverHoldings.size()); - result.put(tablename, svrToRegions); - } - List regions = svrToRegions.get(e.getKey()); - if (regions == null) { - regions = new ArrayList(); - svrToRegions.put(e.getKey(), regions); - } - regions.add(hri); - } - } - } + result = getTableRSRegionMap(server.getConfiguration().getBoolean( + HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && !forceByCluster); } - Map - onlineSvrs = serverManager.getOnlineServers(); + onlineSvrs = serverManager.getOnlineServers(); // Take care of servers w/o assignments, and remove servers in draining mode List drainingServers = this.serverManager.getDrainingServersList(); for (Map> map: result.values()) { @@ -1047,6 +1024,29 @@ public class RegionStates { return result; } + private Map>> getTableRSRegionMap(Boolean bytable){ + Map>> result = + new HashMap>>(); + for (Map.Entry> e: serverHoldings.entrySet()) { + for (HRegionInfo hri: e.getValue()) { + if (hri.isMetaRegion()) continue; + TableName tablename = bytable ? hri.getTable() : TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME); + Map> svrToRegions = result.get(tablename); + if (svrToRegions == null) { + svrToRegions = new HashMap>(serverHoldings.size()); + result.put(tablename, svrToRegions); + } + List regions = svrToRegions.get(e.getKey()); + if (regions == null) { + regions = new ArrayList(); + svrToRegions.put(e.getKey(), regions); + } + regions.add(hri); + } + } + return result; + } + public RegionState getRegionState(final HRegionInfo hri) { return getRegionState(hri.getEncodedName()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index f71f8f7d8e2..807632c2c0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -970,6 +970,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // slop for regions protected float slop; + // overallSlop to controll simpleLoadBalancer's cluster level threshold + protected float overallSlop; protected Configuration config; protected RackManager rackManager; private static final Random RANDOM = new Random(System.currentTimeMillis()); @@ -1035,6 +1037,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { if (slop < 0) slop = 0; else if (slop > 1) slop = 1; + if (overallSlop < 0) overallSlop = 0; + else if (overallSlop > 1) overallSlop = 1; + this.config = conf; String[] tables = getTablesOnMaster(conf); if (tables != null && tables.length > 0) { @@ -1046,6 +1051,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected void setSlop(Configuration conf) { this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2); + this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop); } /** @@ -1139,6 +1145,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { regionFinder.setClusterStatus(st); } + @Override + public void setClusterLoad(Map>> clusterLoad){ + + } + @Override public void setMasterServices(MasterServices masterServices) { masterServerName = masterServices.getServerName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index 548a9a1644e..673db9547de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -26,9 +26,12 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Random; import java.util.TreeMap; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.RegionPlan; import com.google.common.collect.MinMaxPriorityQueue; +import org.apache.hadoop.hbase.util.Pair; /** * Makes decisions about the placement and movement of Regions across @@ -59,7 +63,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { private RegionInfoComparator riComparator = new RegionInfoComparator(); private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator(); - + private float avgLoadOverall; + private List serverLoadList; /** * Stores additional per-server information about the regions added/removed @@ -71,12 +76,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { */ static class BalanceInfo { - private final int nextRegionForUnload; + private int nextRegionForUnload; private int numRegionsAdded; + private List hriList; - public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) { + public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List hriList) { this.nextRegionForUnload = nextRegionForUnload; this.numRegionsAdded = numRegionsAdded; + this.hriList = hriList; } int getNextRegionForUnload() { @@ -90,6 +97,66 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { void setNumRegionsAdded(int numAdded) { this.numRegionsAdded = numAdded; } + + List getHriList() { + return hriList; + } + + void setNextRegionForUnload(int nextRegionForUnload) { + this.nextRegionForUnload = nextRegionForUnload; + } + + } + + public void setClusterLoad(Map>> clusterLoad){ + serverLoadList = new ArrayList<>(); + float sum = 0; + for(Map.Entry>> clusterEntry : clusterLoad.entrySet()){ + for(Map.Entry> entry : clusterEntry.getValue().entrySet()){ + if(entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee + serverLoadList.add(new ServerAndLoad(entry.getKey(), entry.getValue().size())); + sum += entry.getValue().size(); + } + } + avgLoadOverall = sum / serverLoadList.size(); + } + + @Override + public void onConfigurationChange(Configuration conf) { + float originSlop = slop; + float originOverallSlop = overallSlop; + super.setConf(conf); + LOG.info("Update configuration of SimpleLoadBalancer, previous slop is " + + originSlop + ", current slop is " + slop + "previous overallSlop is" + + originOverallSlop + ", current overallSlop is " + originOverallSlop); + } + + private void setLoad(List slList, int i, int loadChange){ + ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange); + slList.set(i, newsl); + } + + /** + * A checker function to decide when we want balance overall and certain table has been balanced, + * do we still need to re-distribute regions of this table to achieve the state of overall-balance + * @return true if this table should be balanced. + */ + private boolean overallNeedsBalance() { + int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop)); + int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop)); + int max = 0, min = Integer.MAX_VALUE; + for(ServerAndLoad server : serverLoadList){ + max = Math.max(server.getLoad(), max); + min = Math.min(server.getLoad(), min); + } + if (max <= ceiling && min >= floor) { + if (LOG.isTraceEnabled()) { + // If nothing to balance, then don't say anything unless trace-level logging. + LOG.trace("Skipping load balancing because cluster is balanced at overall level"); + } + return false; + } + return true; } /** @@ -197,7 +264,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { // construct a Cluster object with clusterMap and rest of the // argument as defaults Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager); - if (!this.needsBalance(c)) return null; + if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null; ClusterLoadState cs = new ClusterLoadState(clusterMap); int numServers = cs.getNumServers(); @@ -231,8 +298,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { ServerAndLoad sal = server.getKey(); int load = sal.getLoad(); if (load <= max) { - serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0)); - break; + serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue())); + continue; } serversOverloaded++; List regions = server.getValue(); @@ -255,7 +322,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { if (numTaken >= numToOffload) break; } serverBalanceInfo.put(sal.getServerName(), - new BalanceInfo(numToOffload, (-1)*numTaken)); + new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue())); } int totalNumMoved = regionsToMove.size(); @@ -296,10 +363,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { underloadedServers.put(si, numToTake-1); cnt++; BalanceInfo bi = serverBalanceInfo.get(si); - if (bi == null) { - bi = new BalanceInfo(0, 0); - serverBalanceInfo.put(si, bi); - } bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1); } if (cnt == 0) break; @@ -311,17 +374,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { neededRegions += i; } - // If none needed to fill all to min and none left to drain all to max, - // we are done - if (neededRegions == 0 && regionsToMove.isEmpty()) { - long endTime = System.currentTimeMillis(); - LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " + - "Moving " + totalNumMoved + " regions off of " + - serversOverloaded + " overloaded servers onto " + - serversUnderloaded + " less loaded servers"); - return regionsToReturn; - } - // Need to do a second pass. // Either more regions to assign out or servers that are still underloaded @@ -338,6 +390,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { HRegionInfo region = server.getValue().get(idx); if (region.isMetaRegion()) continue; // Don't move meta regions. regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null)); + balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1); + balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1); totalNumMoved++; if (--neededRegions == 0) { // No more regions needed, done shedding @@ -370,24 +424,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { } } - // If we still have regions to dish out, assign underloaded to max - if (0 < regionsToMove.size()) { - for (Map.Entry> server : - serversByLoad.entrySet()) { - int regionCount = server.getKey().getLoad(); - BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName()); - if(balanceInfo != null) { - regionCount += balanceInfo.getNumRegionsAdded(); - } - if(regionCount >= max) { - break; - } - addRegionPlan(regionsToMove, fetchFromTail, - server.getKey().getServerName(), regionsToReturn); - if (regionsToMove.isEmpty()) { - break; - } - } + if (min != max) { + balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min); } long endTime = System.currentTimeMillis(); @@ -416,6 +454,128 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { return regionsToReturn; } + /** + * If we need to balanceoverall, we need to add one more round to peel off one region from each max. + * Together with other regions left to be assigned, we distribute all regionToMove, to the RS + * that have less regions in whole cluster scope. + */ + public void balanceOverall(List regionsToReturn, + Map serverBalanceInfo, boolean fetchFromTail, + MinMaxPriorityQueue regionsToMove, int max, int min ){ + // Step 1. + // A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair, + // e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2 + Map> returnMap = new HashMap<>(); + for (int i = 0; i < regionsToReturn.size(); i++) { + List pos = returnMap.get(regionsToReturn.get(i).getDestination()); + if (pos == null) { + pos = new ArrayList<>(); + returnMap.put(regionsToReturn.get(i).getDestination(), pos); + } + pos.add(i); + } + + // Step 2. + // Peel off one region from each RS which has max number of regions now. + // Each RS should have either max or min numbers of regions for this table. + for (int i = 0; i < serverLoadList.size(); i++) { + ServerAndLoad serverload = serverLoadList.get(i); + BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName()); + setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded()); + if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) { + HRegionInfo hriToPlan; + if (balanceInfo.getHriList().size() == 0) { + LOG.debug("During balanceOverall, we found " + serverload.getServerName() + + " has no HRegionInfo, no operation needed"); + continue; + } else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) { + continue; + } else { + hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload()); + } + RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null); + regionsToMove.add(maxPlan); + setLoad(serverLoadList, i, -1); + }else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max + || balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){ + LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " + + "for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() + + " regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" + + max + ", min =" + min + ". Thus stop balance for this table"); // should not happen + return; + } + } + + // Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server. + // We only need to assign the regionsToMove to + // the first n = regionsToMove.size() RS that has least load. + Collections.sort(serverLoadList,new Comparator(){ + @Override + public int compare(ServerAndLoad s1, ServerAndLoad s2) { + if(s1.getLoad() == s2.getLoad()) return 0; + else return (s1.getLoad() > s2.getLoad())? 1 : -1; + }}); + + // Step 4. + // Preparation before assign out all regionsToMove. + // We need to remove the plan that has the source RS equals to destination RS, + // since the source RS belongs to the least n loaded RS. + int assignLength = regionsToMove.size(); + // A structure help to map ServerName to it's load and index in ServerLoadList + Map> SnLoadMap = new HashMap<>(); + for (int i = 0; i < serverLoadList.size(); i++) { + SnLoadMap.put(serverLoadList.get(i).getServerName(), + new Pair(serverLoadList.get(i), i)); + } + Pair shredLoad; + // A List to help mark the plan in regionsToMove that should be removed + List planToRemoveList = new ArrayList<>(); + // A structure to record how many times a server becomes the source of a plan, from regionsToMove. + Map sourceMap = new HashMap<>(); + // We remove one of the plan which would cause source RS equals destination RS. + // But we should keep in mind that the second plan from such RS should be kept. + for(RegionPlan plan: regionsToMove){ + // the source RS's load and index in ServerLoadList + shredLoad = SnLoadMap.get(plan.getSource()); + if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0); + sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1); + if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) { + planToRemoveList.add(plan); + // While marked as to be removed, the count should be add back to the source RS + setLoad(serverLoadList, shredLoad.getSecond(), 1); + } + } + // Remove those marked plans from regionsToMove, + // we cannot direct remove them during iterating through + // regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue. + for(RegionPlan planToRemove : planToRemoveList){ + regionsToMove.remove(planToRemove); + } + + // Step 5. + // We only need to assign the regionsToMove to + // the first n = regionsToMove.size() of them, with least load. + // With this strategy adopted, we can gradually achieve the overall balance, + // while keeping table level balanced. + for(int i = 0; i < assignLength; i++){ + // skip the RS that is also the source, we have removed them from regionsToMove in previous step + if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue; + addRegionPlan(regionsToMove, fetchFromTail, + serverLoadList.get(i).getServerName(), regionsToReturn); + setLoad(serverLoadList, i, 1); + // resolve a possible cyclic assignment pair if we just produced one: + // e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2 + List pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource()); + if (pos != null && pos.size() != 0) { + regionsToReturn.get(pos.get(pos.size() - 1)).setDestination( + regionsToReturn.get(regionsToReturn.size() - 1).getDestination()); + pos.remove(pos.size() - 1); + regionsToReturn.remove(regionsToReturn.size() - 1); + } + } + // Done balance overall + } + /** * Add a region from the head or tail to the List of regions to return. */ @@ -431,6 +591,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { @Override public List balanceCluster(TableName tableName, Map> clusterState) throws HBaseIOException { + LOG.debug("Start Generate Balance plan for table: " + tableName); return balanceCluster(clusterState); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 047cf0f59f7..622dc4b6f7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -306,6 +306,39 @@ public class BalancerTestBase { } } + /** + * Invariant is that all servers have between acceptable range + * number of regions. + */ + public boolean assertClusterOverallAsBalanced(List servers, int tablenum) { + int numServers = servers.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for (ServerAndLoad server : servers) { + int nr = server.getLoad(); + if (nr > maxRegions) { + maxRegions = nr; + } + if (nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if (maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return true; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for (ServerAndLoad server : servers) { + if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1 || server.getLoad() < min - tablenum/2 - 1) + return false; + } + return true; + } + /** * Checks whether region replicas are not hosted on the same host. */ @@ -452,6 +485,45 @@ public class BalancerTestBase { return servers; } + protected TreeMap> mockUniformClusterServers(int[] mockCluster) { + int numServers = mockCluster.length; + TreeMap> servers = new TreeMap>(); + for (int i = 0; i < numServers; i++) { + int numRegions = mockCluster[i]; + ServerAndLoad sal = randomServer(0); + List regions = uniformRegions(numRegions); + servers.put(sal.getServerName(), regions); + } + return servers; + } + + protected HashMap>> mockClusterServersWithTables(Map> clusterServers) { + HashMap>> result = new HashMap<>(); + for (Map.Entry> entry : clusterServers.entrySet()) { + ServerName sal = entry.getKey(); + List regions = entry.getValue(); + for (HRegionInfo hri : regions){ + TreeMap> servers = result.get(hri.getTable()); + if (servers == null) { + servers = new TreeMap>(); + result.put(hri.getTable(), servers); + } + List hrilist = servers.get(sal); + if (hrilist == null) { + hrilist = new ArrayList(); + servers.put(sal, hrilist); + } + hrilist.add(hri); + } + } + for(Map.Entry>> entry : result.entrySet()){ + for(ServerName srn : clusterServers.keySet()){ + if (!entry.getValue().containsKey(srn)) entry.getValue().put(srn, new ArrayList()); + } + } + return result; + } + private Queue regionQueue = new LinkedList(); protected List randomRegions(int numRegions) { @@ -479,6 +551,23 @@ public class BalancerTestBase { return regions; } + protected List uniformRegions(int numRegions) { + List regions = new ArrayList(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + for (int i = 0; i < numRegions; i++) { + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + TableName tableName = + TableName.valueOf("table" + i); + HRegionInfo hri = new HRegionInfo(tableName, start, end, false); + regions.add(hri); + } + return regions; + } + protected void returnRegions(List regions) { regionQueue.addAll(regions); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java index c1e86927a8f..dcf78ff01ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java @@ -17,17 +17,23 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -35,6 +41,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Test the load balancer that is created by default. */ @@ -103,29 +112,82 @@ public class TestDefaultLoadBalancer extends BalancerTestBase { new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 }, new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } }; + int [] mockUniformCluster = new int[] { 5, 5, 5, 5, 5 ,0}; + + /** * Test the load balancing algorithm. * * Invariant is that all servers should be hosting either floor(average) or - * ceiling(average) + * ceiling(average) at both table level and cluster level * * @throws Exception */ @Test (timeout=60000) - public void testBalanceCluster() throws Exception { - + public void testBalanceClusterOverall() throws Exception { + Map>> clusterLoad + = new TreeMap>>(); for (int[] mockCluster : clusterStateMocks) { - Map> servers = mockClusterServers(mockCluster); + Map> clusterServers = mockClusterServers(mockCluster, 50); + List clusterList = convertToList(clusterServers); + clusterLoad.put(TableName.valueOf("ensemble"), clusterServers); + HashMap>> result = mockClusterServersWithTables(clusterServers); + loadBalancer.setClusterLoad(clusterLoad); + List clusterplans = new ArrayList(); + List> regionAmountList = new ArrayList>(); + for(TreeMap> servers : result.values()){ + List list = convertToList(servers); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + List partialplans = loadBalancer.balanceCluster(servers); + if(partialplans != null) clusterplans.addAll(partialplans); + List balancedClusterPerTable = reconcile(list, partialplans, servers); + LOG.info("Mock Balance : " + printMock(balancedClusterPerTable)); + assertClusterAsBalanced(balancedClusterPerTable); + for (Map.Entry> entry : servers.entrySet()) { + returnRegions(entry.getValue()); + returnServer(entry.getKey()); + } + } + List balancedCluster = reconcile(clusterList, clusterplans, clusterServers); + assertTrue(assertClusterOverallAsBalanced(balancedCluster, result.keySet().size())); + } + } + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers should be hosting either floor(average) or + * ceiling(average) at both table level and cluster level + * Deliberately generate a special case to show the overall strategy can achieve cluster + * level balance while the bytable strategy cannot + * @throws Exception + */ + @Test (timeout=60000) + public void testImpactOfBalanceClusterOverall() throws Exception { + Map>> clusterLoad + = new TreeMap>>(); + Map> clusterServers = mockUniformClusterServers(mockUniformCluster); + List clusterList = convertToList(clusterServers); + clusterLoad.put(TableName.valueOf("ensemble"), clusterServers); + // use overall can achieve both table and cluster level balance + HashMap>> result1 = mockClusterServersWithTables(clusterServers); + loadBalancer.setClusterLoad(clusterLoad); + List clusterplans1 = new ArrayList(); + List> regionAmountList = new ArrayList>(); + for(TreeMap> servers : result1.values()){ List list = convertToList(servers); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); - List plans = loadBalancer.balanceCluster(servers); - List balancedCluster = reconcile(list, plans, servers); - LOG.info("Mock Balance : " + printMock(balancedCluster)); - assertClusterAsBalanced(balancedCluster); + List partialplans = loadBalancer.balanceCluster(servers); + if(partialplans != null) clusterplans1.addAll(partialplans); + List balancedClusterPerTable = reconcile(list, partialplans, servers); + LOG.info("Mock Balance : " + printMock(balancedClusterPerTable)); + assertClusterAsBalanced(balancedClusterPerTable); for (Map.Entry> entry : servers.entrySet()) { returnRegions(entry.getValue()); returnServer(entry.getKey()); } } + List balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers); + assertTrue(assertClusterOverallAsBalanced(balancedCluster1, result1.keySet().size())); } }