HBASE-17110 Improve SimpleLoadBalancer to always take server-level balance into account
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
b6f5d5b85f
commit
b2086873a9
|
@ -45,6 +45,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -114,6 +115,11 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
|
|||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
|
||||
clusterState) throws HBaseIOException {
|
||||
|
@ -139,6 +145,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
|
|||
for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
|
||||
Map<ServerName, List<HRegionInfo>> groupClusterState =
|
||||
new HashMap<ServerName, List<HRegionInfo>>();
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> groupClusterLoad =
|
||||
new HashMap<TableName, Map<ServerName, List<HRegionInfo>>>();
|
||||
for (HostAndPort sName : info.getServers()) {
|
||||
for(ServerName curr: clusterState.keySet()) {
|
||||
if(curr.getHostPort().equals(sName)) {
|
||||
|
@ -146,6 +154,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalanc
|
|||
}
|
||||
}
|
||||
}
|
||||
groupClusterLoad.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), groupClusterState);
|
||||
this.internalBalancer.setClusterLoad(groupClusterLoad);
|
||||
List<RegionPlan> groupPlans = this.internalBalancer
|
||||
.balanceCluster(groupClusterState);
|
||||
if (groupPlans != null) {
|
||||
|
|
|
@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore;
|
|||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||
import org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
|
||||
|
@ -1237,6 +1238,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
//Give the balancer the current cluster state.
|
||||
this.balancer.setClusterStatus(getClusterStatus());
|
||||
this.balancer.setClusterLoad(
|
||||
this.assignmentManager.getRegionStates().getAssignmentsByTable(true));
|
||||
|
||||
for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
|
||||
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
|
||||
if (partialPlans != null) plans.addAll(partialPlans);
|
||||
|
|
|
@ -57,6 +57,11 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
|
|||
*/
|
||||
void setClusterStatus(ClusterStatus st);
|
||||
|
||||
/**
|
||||
* Pass RegionStates and allow balancer to set the current cluster load.
|
||||
* @param ClusterLoad
|
||||
*/
|
||||
void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> ClusterLoad);
|
||||
|
||||
/**
|
||||
* Set the master service.
|
||||
|
|
|
@ -990,50 +990,27 @@ public class RegionStates {
|
|||
(double)totalLoad / (double)numServers;
|
||||
}
|
||||
|
||||
protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
|
||||
return getAssignmentsByTable(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an EXPENSIVE clone. Cloning though is the safest thing to do.
|
||||
* Can't let out original since it can change and at least the load balancer
|
||||
* wants to iterate this exported list. We need to synchronize on regions
|
||||
* since all access to this.servers is under a lock on this.regions.
|
||||
*
|
||||
* @param forceByCluster a flag to force to aggregate the server-load to the cluster level
|
||||
* @return A clone of current assignments by table.
|
||||
*/
|
||||
protected Map<TableName, Map<ServerName, List<HRegionInfo>>>
|
||||
getAssignmentsByTable() {
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
|
||||
new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
|
||||
protected Map<TableName, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable(
|
||||
boolean forceByCluster) {
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> result;
|
||||
synchronized (this) {
|
||||
if (!server.getConfiguration().getBoolean(
|
||||
HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) {
|
||||
Map<ServerName, List<HRegionInfo>> svrToRegions =
|
||||
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
|
||||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||
svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
|
||||
}
|
||||
result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions);
|
||||
} else {
|
||||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||
for (HRegionInfo hri: e.getValue()) {
|
||||
if (hri.isMetaRegion()) continue;
|
||||
TableName tablename = hri.getTable();
|
||||
Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
|
||||
if (svrToRegions == null) {
|
||||
svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
|
||||
result.put(tablename, svrToRegions);
|
||||
}
|
||||
List<HRegionInfo> regions = svrToRegions.get(e.getKey());
|
||||
if (regions == null) {
|
||||
regions = new ArrayList<HRegionInfo>();
|
||||
svrToRegions.put(e.getKey(), regions);
|
||||
}
|
||||
regions.add(hri);
|
||||
}
|
||||
}
|
||||
}
|
||||
result = getTableRSRegionMap(server.getConfiguration().getBoolean(
|
||||
HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE,false) && !forceByCluster);
|
||||
}
|
||||
|
||||
Map<ServerName, ServerLoad>
|
||||
onlineSvrs = serverManager.getOnlineServers();
|
||||
onlineSvrs = serverManager.getOnlineServers();
|
||||
// Take care of servers w/o assignments, and remove servers in draining mode
|
||||
List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
|
||||
for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
|
||||
|
@ -1047,6 +1024,29 @@ public class RegionStates {
|
|||
return result;
|
||||
}
|
||||
|
||||
private Map<TableName, Map<ServerName, List<HRegionInfo>>> getTableRSRegionMap(Boolean bytable){
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
|
||||
new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
|
||||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||
for (HRegionInfo hri: e.getValue()) {
|
||||
if (hri.isMetaRegion()) continue;
|
||||
TableName tablename = bytable ? hri.getTable() : TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
|
||||
Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
|
||||
if (svrToRegions == null) {
|
||||
svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
|
||||
result.put(tablename, svrToRegions);
|
||||
}
|
||||
List<HRegionInfo> regions = svrToRegions.get(e.getKey());
|
||||
if (regions == null) {
|
||||
regions = new ArrayList<HRegionInfo>();
|
||||
svrToRegions.put(e.getKey(), regions);
|
||||
}
|
||||
regions.add(hri);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public RegionState getRegionState(final HRegionInfo hri) {
|
||||
return getRegionState(hri.getEncodedName());
|
||||
}
|
||||
|
|
|
@ -970,6 +970,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
// slop for regions
|
||||
protected float slop;
|
||||
// overallSlop to controll simpleLoadBalancer's cluster level threshold
|
||||
protected float overallSlop;
|
||||
protected Configuration config;
|
||||
protected RackManager rackManager;
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
|
@ -1035,6 +1037,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
if (slop < 0) slop = 0;
|
||||
else if (slop > 1) slop = 1;
|
||||
|
||||
if (overallSlop < 0) overallSlop = 0;
|
||||
else if (overallSlop > 1) overallSlop = 1;
|
||||
|
||||
this.config = conf;
|
||||
String[] tables = getTablesOnMaster(conf);
|
||||
if (tables != null && tables.length > 0) {
|
||||
|
@ -1046,6 +1051,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
protected void setSlop(Configuration conf) {
|
||||
this.slop = conf.getFloat("hbase.regions.slop", (float) 0.2);
|
||||
this.overallSlop = conf.getFloat("hbase.regions.overallSlop", slop);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1139,6 +1145,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
regionFinder.setClusterStatus(st);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
masterServerName = masterServices.getServerName();
|
||||
|
|
|
@ -26,9 +26,12 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
|
||||
import com.google.common.collect.MinMaxPriorityQueue;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* Makes decisions about the placement and movement of Regions across
|
||||
|
@ -59,7 +63,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
private RegionInfoComparator riComparator = new RegionInfoComparator();
|
||||
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
|
||||
|
||||
private float avgLoadOverall;
|
||||
private List<ServerAndLoad> serverLoadList;
|
||||
|
||||
/**
|
||||
* Stores additional per-server information about the regions added/removed
|
||||
|
@ -71,12 +76,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
*/
|
||||
static class BalanceInfo {
|
||||
|
||||
private final int nextRegionForUnload;
|
||||
private int nextRegionForUnload;
|
||||
private int numRegionsAdded;
|
||||
private List<HRegionInfo> hriList;
|
||||
|
||||
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
|
||||
public BalanceInfo(int nextRegionForUnload, int numRegionsAdded, List<HRegionInfo> hriList) {
|
||||
this.nextRegionForUnload = nextRegionForUnload;
|
||||
this.numRegionsAdded = numRegionsAdded;
|
||||
this.hriList = hriList;
|
||||
}
|
||||
|
||||
int getNextRegionForUnload() {
|
||||
|
@ -90,6 +97,66 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
void setNumRegionsAdded(int numAdded) {
|
||||
this.numRegionsAdded = numAdded;
|
||||
}
|
||||
|
||||
List<HRegionInfo> getHriList() {
|
||||
return hriList;
|
||||
}
|
||||
|
||||
void setNextRegionForUnload(int nextRegionForUnload) {
|
||||
this.nextRegionForUnload = nextRegionForUnload;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad){
|
||||
serverLoadList = new ArrayList<>();
|
||||
float sum = 0;
|
||||
for(Map.Entry<TableName, Map<ServerName, List<HRegionInfo>>> clusterEntry : clusterLoad.entrySet()){
|
||||
for(Map.Entry<ServerName, List<HRegionInfo>> entry : clusterEntry.getValue().entrySet()){
|
||||
if(entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
|
||||
serverLoadList.add(new ServerAndLoad(entry.getKey(), entry.getValue().size()));
|
||||
sum += entry.getValue().size();
|
||||
}
|
||||
}
|
||||
avgLoadOverall = sum / serverLoadList.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
float originSlop = slop;
|
||||
float originOverallSlop = overallSlop;
|
||||
super.setConf(conf);
|
||||
LOG.info("Update configuration of SimpleLoadBalancer, previous slop is "
|
||||
+ originSlop + ", current slop is " + slop + "previous overallSlop is" +
|
||||
originOverallSlop + ", current overallSlop is " + originOverallSlop);
|
||||
}
|
||||
|
||||
private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
|
||||
ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
|
||||
slList.set(i, newsl);
|
||||
}
|
||||
|
||||
/**
|
||||
* A checker function to decide when we want balance overall and certain table has been balanced,
|
||||
* do we still need to re-distribute regions of this table to achieve the state of overall-balance
|
||||
* @return true if this table should be balanced.
|
||||
*/
|
||||
private boolean overallNeedsBalance() {
|
||||
int floor = (int) Math.floor(avgLoadOverall * (1 - overallSlop));
|
||||
int ceiling = (int) Math.ceil(avgLoadOverall * (1 + overallSlop));
|
||||
int max = 0, min = Integer.MAX_VALUE;
|
||||
for(ServerAndLoad server : serverLoadList){
|
||||
max = Math.max(server.getLoad(), max);
|
||||
min = Math.min(server.getLoad(), min);
|
||||
}
|
||||
if (max <= ceiling && min >= floor) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
// If nothing to balance, then don't say anything unless trace-level logging.
|
||||
LOG.trace("Skipping load balancing because cluster is balanced at overall level");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -197,7 +264,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
// construct a Cluster object with clusterMap and rest of the
|
||||
// argument as defaults
|
||||
Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
|
||||
if (!this.needsBalance(c)) return null;
|
||||
if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
|
||||
|
||||
ClusterLoadState cs = new ClusterLoadState(clusterMap);
|
||||
int numServers = cs.getNumServers();
|
||||
|
@ -231,8 +298,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
ServerAndLoad sal = server.getKey();
|
||||
int load = sal.getLoad();
|
||||
if (load <= max) {
|
||||
serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0));
|
||||
break;
|
||||
serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
|
||||
continue;
|
||||
}
|
||||
serversOverloaded++;
|
||||
List<HRegionInfo> regions = server.getValue();
|
||||
|
@ -255,7 +322,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
if (numTaken >= numToOffload) break;
|
||||
}
|
||||
serverBalanceInfo.put(sal.getServerName(),
|
||||
new BalanceInfo(numToOffload, (-1)*numTaken));
|
||||
new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
|
||||
}
|
||||
int totalNumMoved = regionsToMove.size();
|
||||
|
||||
|
@ -296,10 +363,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
underloadedServers.put(si, numToTake-1);
|
||||
cnt++;
|
||||
BalanceInfo bi = serverBalanceInfo.get(si);
|
||||
if (bi == null) {
|
||||
bi = new BalanceInfo(0, 0);
|
||||
serverBalanceInfo.put(si, bi);
|
||||
}
|
||||
bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
|
||||
}
|
||||
if (cnt == 0) break;
|
||||
|
@ -311,17 +374,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
neededRegions += i;
|
||||
}
|
||||
|
||||
// If none needed to fill all to min and none left to drain all to max,
|
||||
// we are done
|
||||
if (neededRegions == 0 && regionsToMove.isEmpty()) {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
|
||||
"Moving " + totalNumMoved + " regions off of " +
|
||||
serversOverloaded + " overloaded servers onto " +
|
||||
serversUnderloaded + " less loaded servers");
|
||||
return regionsToReturn;
|
||||
}
|
||||
|
||||
// Need to do a second pass.
|
||||
// Either more regions to assign out or servers that are still underloaded
|
||||
|
||||
|
@ -338,6 +390,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
HRegionInfo region = server.getValue().get(idx);
|
||||
if (region.isMetaRegion()) continue; // Don't move meta regions.
|
||||
regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
|
||||
balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
|
||||
balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
|
||||
totalNumMoved++;
|
||||
if (--neededRegions == 0) {
|
||||
// No more regions needed, done shedding
|
||||
|
@ -370,24 +424,8 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
// If we still have regions to dish out, assign underloaded to max
|
||||
if (0 < regionsToMove.size()) {
|
||||
for (Map.Entry<ServerAndLoad, List<HRegionInfo>> server :
|
||||
serversByLoad.entrySet()) {
|
||||
int regionCount = server.getKey().getLoad();
|
||||
BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
|
||||
if(balanceInfo != null) {
|
||||
regionCount += balanceInfo.getNumRegionsAdded();
|
||||
}
|
||||
if(regionCount >= max) {
|
||||
break;
|
||||
}
|
||||
addRegionPlan(regionsToMove, fetchFromTail,
|
||||
server.getKey().getServerName(), regionsToReturn);
|
||||
if (regionsToMove.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (min != max) {
|
||||
balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
|
@ -416,6 +454,128 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
return regionsToReturn;
|
||||
}
|
||||
|
||||
/**
|
||||
* If we need to balanceoverall, we need to add one more round to peel off one region from each max.
|
||||
* Together with other regions left to be assigned, we distribute all regionToMove, to the RS
|
||||
* that have less regions in whole cluster scope.
|
||||
*/
|
||||
public void balanceOverall(List<RegionPlan> regionsToReturn,
|
||||
Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
|
||||
MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
|
||||
// Step 1.
|
||||
// A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
|
||||
// e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
|
||||
Map<ServerName, List<Integer>> returnMap = new HashMap<>();
|
||||
for (int i = 0; i < regionsToReturn.size(); i++) {
|
||||
List<Integer> pos = returnMap.get(regionsToReturn.get(i).getDestination());
|
||||
if (pos == null) {
|
||||
pos = new ArrayList<>();
|
||||
returnMap.put(regionsToReturn.get(i).getDestination(), pos);
|
||||
}
|
||||
pos.add(i);
|
||||
}
|
||||
|
||||
// Step 2.
|
||||
// Peel off one region from each RS which has max number of regions now.
|
||||
// Each RS should have either max or min numbers of regions for this table.
|
||||
for (int i = 0; i < serverLoadList.size(); i++) {
|
||||
ServerAndLoad serverload = serverLoadList.get(i);
|
||||
BalanceInfo balanceInfo = serverBalanceInfo.get(serverload.getServerName());
|
||||
setLoad(serverLoadList, i, balanceInfo.getNumRegionsAdded());
|
||||
if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() == max) {
|
||||
HRegionInfo hriToPlan;
|
||||
if (balanceInfo.getHriList().size() == 0) {
|
||||
LOG.debug("During balanceOverall, we found " + serverload.getServerName()
|
||||
+ " has no HRegionInfo, no operation needed");
|
||||
continue;
|
||||
} else if (balanceInfo.getNextRegionForUnload() >= balanceInfo.getHriList().size()) {
|
||||
continue;
|
||||
} else {
|
||||
hriToPlan = balanceInfo.getHriList().get(balanceInfo.getNextRegionForUnload());
|
||||
}
|
||||
RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
|
||||
regionsToMove.add(maxPlan);
|
||||
setLoad(serverLoadList, i, -1);
|
||||
}else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
|
||||
|| balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
|
||||
LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
|
||||
"for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
|
||||
" regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
|
||||
max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
|
||||
// We only need to assign the regionsToMove to
|
||||
// the first n = regionsToMove.size() RS that has least load.
|
||||
Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
|
||||
@Override
|
||||
public int compare(ServerAndLoad s1, ServerAndLoad s2) {
|
||||
if(s1.getLoad() == s2.getLoad()) return 0;
|
||||
else return (s1.getLoad() > s2.getLoad())? 1 : -1;
|
||||
}});
|
||||
|
||||
// Step 4.
|
||||
// Preparation before assign out all regionsToMove.
|
||||
// We need to remove the plan that has the source RS equals to destination RS,
|
||||
// since the source RS belongs to the least n loaded RS.
|
||||
int assignLength = regionsToMove.size();
|
||||
// A structure help to map ServerName to it's load and index in ServerLoadList
|
||||
Map<ServerName, Pair<ServerAndLoad,Integer>> SnLoadMap = new HashMap<>();
|
||||
for (int i = 0; i < serverLoadList.size(); i++) {
|
||||
SnLoadMap.put(serverLoadList.get(i).getServerName(),
|
||||
new Pair<ServerAndLoad, Integer>(serverLoadList.get(i), i));
|
||||
}
|
||||
Pair<ServerAndLoad,Integer> shredLoad;
|
||||
// A List to help mark the plan in regionsToMove that should be removed
|
||||
List<RegionPlan> planToRemoveList = new ArrayList<>();
|
||||
// A structure to record how many times a server becomes the source of a plan, from regionsToMove.
|
||||
Map<ServerName, Integer> sourceMap = new HashMap<>();
|
||||
// We remove one of the plan which would cause source RS equals destination RS.
|
||||
// But we should keep in mind that the second plan from such RS should be kept.
|
||||
for(RegionPlan plan: regionsToMove){
|
||||
// the source RS's load and index in ServerLoadList
|
||||
shredLoad = SnLoadMap.get(plan.getSource());
|
||||
if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
|
||||
sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
|
||||
if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
|
||||
planToRemoveList.add(plan);
|
||||
// While marked as to be removed, the count should be add back to the source RS
|
||||
setLoad(serverLoadList, shredLoad.getSecond(), 1);
|
||||
}
|
||||
}
|
||||
// Remove those marked plans from regionsToMove,
|
||||
// we cannot direct remove them during iterating through
|
||||
// regionsToMove, due to the fact that regionsToMove is a MinMaxPriorityQueue.
|
||||
for(RegionPlan planToRemove : planToRemoveList){
|
||||
regionsToMove.remove(planToRemove);
|
||||
}
|
||||
|
||||
// Step 5.
|
||||
// We only need to assign the regionsToMove to
|
||||
// the first n = regionsToMove.size() of them, with least load.
|
||||
// With this strategy adopted, we can gradually achieve the overall balance,
|
||||
// while keeping table level balanced.
|
||||
for(int i = 0; i < assignLength; i++){
|
||||
// skip the RS that is also the source, we have removed them from regionsToMove in previous step
|
||||
if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
|
||||
addRegionPlan(regionsToMove, fetchFromTail,
|
||||
serverLoadList.get(i).getServerName(), regionsToReturn);
|
||||
setLoad(serverLoadList, i, 1);
|
||||
// resolve a possible cyclic assignment pair if we just produced one:
|
||||
// e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
|
||||
List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
|
||||
if (pos != null && pos.size() != 0) {
|
||||
regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
|
||||
regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
|
||||
pos.remove(pos.size() - 1);
|
||||
regionsToReturn.remove(regionsToReturn.size() - 1);
|
||||
}
|
||||
}
|
||||
// Done balance overall
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a region from the head or tail to the List of regions to return.
|
||||
*/
|
||||
|
@ -431,6 +591,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
|
||||
LOG.debug("Start Generate Balance plan for table: " + tableName);
|
||||
return balanceCluster(clusterState);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -306,6 +306,39 @@ public class BalancerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invariant is that all servers have between acceptable range
|
||||
* number of regions.
|
||||
*/
|
||||
public boolean assertClusterOverallAsBalanced(List<ServerAndLoad> servers, int tablenum) {
|
||||
int numServers = servers.size();
|
||||
int numRegions = 0;
|
||||
int maxRegions = 0;
|
||||
int minRegions = Integer.MAX_VALUE;
|
||||
for (ServerAndLoad server : servers) {
|
||||
int nr = server.getLoad();
|
||||
if (nr > maxRegions) {
|
||||
maxRegions = nr;
|
||||
}
|
||||
if (nr < minRegions) {
|
||||
minRegions = nr;
|
||||
}
|
||||
numRegions += nr;
|
||||
}
|
||||
if (maxRegions - minRegions < 2) {
|
||||
// less than 2 between max and min, can't balance
|
||||
return true;
|
||||
}
|
||||
int min = numRegions / numServers;
|
||||
int max = numRegions % numServers == 0 ? min : min + 1;
|
||||
|
||||
for (ServerAndLoad server : servers) {
|
||||
if (server.getLoad() < 0 || server.getLoad() > max + tablenum/2 + 1 || server.getLoad() < min - tablenum/2 - 1)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether region replicas are not hosted on the same host.
|
||||
*/
|
||||
|
@ -452,6 +485,45 @@ public class BalancerTestBase {
|
|||
return servers;
|
||||
}
|
||||
|
||||
protected TreeMap<ServerName, List<HRegionInfo>> mockUniformClusterServers(int[] mockCluster) {
|
||||
int numServers = mockCluster.length;
|
||||
TreeMap<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
|
||||
for (int i = 0; i < numServers; i++) {
|
||||
int numRegions = mockCluster[i];
|
||||
ServerAndLoad sal = randomServer(0);
|
||||
List<HRegionInfo> regions = uniformRegions(numRegions);
|
||||
servers.put(sal.getServerName(), regions);
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
|
||||
protected HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> mockClusterServersWithTables(Map<ServerName, List<HRegionInfo>> clusterServers) {
|
||||
HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = new HashMap<>();
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> entry : clusterServers.entrySet()) {
|
||||
ServerName sal = entry.getKey();
|
||||
List<HRegionInfo> regions = entry.getValue();
|
||||
for (HRegionInfo hri : regions){
|
||||
TreeMap<ServerName, List<HRegionInfo>> servers = result.get(hri.getTable());
|
||||
if (servers == null) {
|
||||
servers = new TreeMap<ServerName, List<HRegionInfo>>();
|
||||
result.put(hri.getTable(), servers);
|
||||
}
|
||||
List<HRegionInfo> hrilist = servers.get(sal);
|
||||
if (hrilist == null) {
|
||||
hrilist = new ArrayList<HRegionInfo>();
|
||||
servers.put(sal, hrilist);
|
||||
}
|
||||
hrilist.add(hri);
|
||||
}
|
||||
}
|
||||
for(Map.Entry<TableName, TreeMap<ServerName, List<HRegionInfo>>> entry : result.entrySet()){
|
||||
for(ServerName srn : clusterServers.keySet()){
|
||||
if (!entry.getValue().containsKey(srn)) entry.getValue().put(srn, new ArrayList<HRegionInfo>());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
|
||||
|
||||
protected List<HRegionInfo> randomRegions(int numRegions) {
|
||||
|
@ -479,6 +551,23 @@ public class BalancerTestBase {
|
|||
return regions;
|
||||
}
|
||||
|
||||
protected List<HRegionInfo> uniformRegions(int numRegions) {
|
||||
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
|
||||
byte[] start = new byte[16];
|
||||
byte[] end = new byte[16];
|
||||
rand.nextBytes(start);
|
||||
rand.nextBytes(end);
|
||||
for (int i = 0; i < numRegions; i++) {
|
||||
Bytes.putInt(start, 0, numRegions << 1);
|
||||
Bytes.putInt(end, 0, (numRegions << 1) + 1);
|
||||
TableName tableName =
|
||||
TableName.valueOf("table" + i);
|
||||
HRegionInfo hri = new HRegionInfo(tableName, start, end, false);
|
||||
regions.add(hri);
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
protected void returnRegions(List<HRegionInfo> regions) {
|
||||
regionQueue.addAll(regions);
|
||||
}
|
||||
|
|
|
@ -17,17 +17,23 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
|
@ -35,6 +41,9 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test the load balancer that is created by default.
|
||||
*/
|
||||
|
@ -103,29 +112,82 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
|
|||
new int[] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
|
||||
new int[] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 } };
|
||||
|
||||
int [] mockUniformCluster = new int[] { 5, 5, 5, 5, 5 ,0};
|
||||
|
||||
|
||||
/**
|
||||
* Test the load balancing algorithm.
|
||||
*
|
||||
* Invariant is that all servers should be hosting either floor(average) or
|
||||
* ceiling(average)
|
||||
* ceiling(average) at both table level and cluster level
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testBalanceCluster() throws Exception {
|
||||
|
||||
public void testBalanceClusterOverall() throws Exception {
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
|
||||
= new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
|
||||
Map<ServerName, List<HRegionInfo>> clusterServers = mockClusterServers(mockCluster, 50);
|
||||
List<ServerAndLoad> clusterList = convertToList(clusterServers);
|
||||
clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
|
||||
HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result = mockClusterServersWithTables(clusterServers);
|
||||
loadBalancer.setClusterLoad(clusterLoad);
|
||||
List<RegionPlan> clusterplans = new ArrayList<RegionPlan>();
|
||||
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
|
||||
for(TreeMap<ServerName, List<HRegionInfo>> servers : result.values()){
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
|
||||
if(partialplans != null) clusterplans.addAll(partialplans);
|
||||
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
|
||||
assertClusterAsBalanced(balancedClusterPerTable);
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
|
||||
returnRegions(entry.getValue());
|
||||
returnServer(entry.getKey());
|
||||
}
|
||||
}
|
||||
List<ServerAndLoad> balancedCluster = reconcile(clusterList, clusterplans, clusterServers);
|
||||
assertTrue(assertClusterOverallAsBalanced(balancedCluster, result.keySet().size()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the load balancing algorithm.
|
||||
*
|
||||
* Invariant is that all servers should be hosting either floor(average) or
|
||||
* ceiling(average) at both table level and cluster level
|
||||
* Deliberately generate a special case to show the overall strategy can achieve cluster
|
||||
* level balance while the bytable strategy cannot
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testImpactOfBalanceClusterOverall() throws Exception {
|
||||
Map<TableName, Map<ServerName, List<HRegionInfo>>> clusterLoad
|
||||
= new TreeMap<TableName, Map<ServerName, List<HRegionInfo>>>();
|
||||
Map<ServerName, List<HRegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
|
||||
List<ServerAndLoad> clusterList = convertToList(clusterServers);
|
||||
clusterLoad.put(TableName.valueOf("ensemble"), clusterServers);
|
||||
// use overall can achieve both table and cluster level balance
|
||||
HashMap<TableName, TreeMap<ServerName, List<HRegionInfo>>> result1 = mockClusterServersWithTables(clusterServers);
|
||||
loadBalancer.setClusterLoad(clusterLoad);
|
||||
List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
|
||||
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
|
||||
for(TreeMap<ServerName, List<HRegionInfo>> servers : result1.values()){
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
|
||||
List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedCluster));
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
|
||||
if(partialplans != null) clusterplans1.addAll(partialplans);
|
||||
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
|
||||
assertClusterAsBalanced(balancedClusterPerTable);
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
|
||||
returnRegions(entry.getValue());
|
||||
returnServer(entry.getKey());
|
||||
}
|
||||
}
|
||||
List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
|
||||
assertTrue(assertClusterOverallAsBalanced(balancedCluster1, result1.keySet().size()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue