HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve overallbalanced (#1279)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
niuyulin 2020-03-22 21:54:57 +08:00 committed by GitHub
parent ca533a77cb
commit d4ab645531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 348 additions and 300 deletions

View File

@ -87,10 +87,11 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
}
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
//TODO. Look at is whether Stochastic loadbalancer can be integrated with this
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
List<RegionPlan> plans = new ArrayList<>();
//perform a scan of the meta to get the latest updates (if any)
// perform a scan of the meta to get the latest updates (if any)
SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
try {
@ -99,43 +100,44 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
LOG.warn("Not running balancer since exception was thrown " + ie);
return plans;
}
// This is not used? Findbugs says so: Map<ServerName, ServerName> serverNameToServerNameWithoutCode = new HashMap<>();
// This is not used? Findbugs says so: Map<ServerName, ServerName>
// serverNameToServerNameWithoutCode = new HashMap<>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
ServerManager serverMgr = super.services.getServerManager();
for (ServerName sn: serverMgr.getOnlineServersList()) {
for (ServerName sn : serverMgr.getOnlineServersList()) {
ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
// FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
serverNameWithoutCodeToServerName.put(s, sn);
}
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
ServerName currentServer = entry.getKey();
//get a server without the startcode for the currentServer
// get a server without the startcode for the currentServer
ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
currentServer.getPort(), ServerName.NON_STARTCODE);
currentServer.getPort(), ServerName.NON_STARTCODE);
List<RegionInfo> list = entry.getValue();
for (RegionInfo region : list) {
if(!FavoredNodesManager.isFavoredNodeApplicable(region)) {
if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
continue;
}
List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
continue; //either favorednodes does not exist or we are already on the primary node
continue; // either favorednodes does not exist or we are already on the primary node
}
ServerName destination = null;
//check whether the primary is available
// check whether the primary is available
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
if (destination == null) {
//check whether the region is on secondary/tertiary
if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
// check whether the region is on secondary/tertiary
if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
|| currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
continue;
}
//the region is currently on none of the favored nodes
//get it on one of them if possible
ServerMetrics l1 = super.services.getServerManager().getLoad(
serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
ServerMetrics l2 = super.services.getServerManager().getLoad(
serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
// the region is currently on none of the favored nodes
// get it on one of them if possible
ServerMetrics l1 = super.services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
ServerMetrics l2 = super.services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
if (l1 != null && l2 != null) {
if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
@ -436,9 +438,4 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
fnm.updateFavoredNodes(regionFNMap);
}
@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
return balanceCluster(clusterState);
}
}

View File

@ -40,7 +40,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -1791,26 +1790,17 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
boolean isByTable = getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false);
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
this.assignmentManager.getRegionStates()
.getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList(),
isByTable);
.getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList());
for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
}
//Give the balancer the current cluster state.
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
this.balancer.setClusterLoad(assignments);
List<RegionPlan> plans = new ArrayList<>();
for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
if (partialPlans != null) {
plans.addAll(partialPlans);
}
}
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
if (skipRegionManagementAction("balancer")) {
// make one last check that the cluster isn't shutting down before proceeding.

View File

@ -78,10 +78,6 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
*/
void setClusterMetrics(ClusterMetrics st);
/**
* Pass RegionStates and allow balancer to set the current cluster load.
*/
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> ClusterLoad);
/**
* Set the master service.
@ -89,18 +85,26 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
void setMasterServices(MasterServices masterServices);
/**
* Perform the major balance operation
* @return List of plans
* Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do actual
* balance. Normally not need override this method, except
* {@link org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer} and
* {@link org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer}
* @param loadOfAllTable region load of servers for all table
* @return a list of regions to be moved, including source and destination, or null if cluster is
* already balanced
*/
List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<RegionInfo>> clusterState) throws IOException;
List<RegionPlan> balanceCluster(Map<TableName,
Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException;
/**
* Perform the major balance operation
* Perform the major balance operation for table, all class implement of {@link LoadBalancer}
* should override this method
* @param tableName the table to be balanced
* @param loadOfOneTable region load of servers for the specific one table
* @return List of plans
*/
List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
throws IOException;
List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable);
/**
* Perform a Round Robin assignment of regions.

View File

@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -552,61 +551,42 @@ public class RegionStates {
* wants to iterate this exported list. We need to synchronize on regions
* since all access to this.servers is under a lock on this.regions.
*
* @param isByTable If <code>true</code>, return the assignments by table. If <code>false</code>,
* return the assignments which aggregate the server-load to the cluster level.
* @return A clone of current assignments.
*/
public Map<TableName, Map<ServerName, List<RegionInfo>>> getAssignmentsForBalancer(
TableStateManager tableStateManager, List<ServerName> onlineServers, boolean isByTable) {
TableStateManager tableStateManager, List<ServerName> onlineServers) {
final Map<TableName, Map<ServerName, List<RegionInfo>>> result = new HashMap<>();
if (isByTable) {
for (RegionStateNode node : regionsMap.values()) {
if (isTableDisabled(tableStateManager, node.getTable())) {
continue;
}
if (node.getRegionInfo().isSplitParent()) {
continue;
}
Map<ServerName, List<RegionInfo>> tableResult =
result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
final ServerName serverName = node.getRegionLocation();
if (serverName == null) {
LOG.info("Skipping, no server for " + node);
continue;
}
List<RegionInfo> serverResult =
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
serverResult.add(node.getRegionInfo());
for (RegionStateNode node : regionsMap.values()) {
if (isTableDisabled(tableStateManager, node.getTable())) {
continue;
}
// Add online servers with no assignment for the table.
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
for (ServerName serverName : onlineServers) {
table.computeIfAbsent(serverName, key -> new ArrayList<>());
}
if (node.getRegionInfo().isSplitParent()) {
continue;
}
} else {
final HashMap<ServerName, List<RegionInfo>> ensemble = new HashMap<>(serverMap.size());
Map<ServerName, List<RegionInfo>> tableResult =
result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
final ServerName serverName = node.getRegionLocation();
if (serverName == null) {
LOG.info("Skipping, no server for " + node);
continue;
}
List<RegionInfo> serverResult =
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
serverResult.add(node.getRegionInfo());
}
// Add online servers with no assignment for the table.
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
for (ServerName serverName : onlineServers) {
ServerStateNode serverNode = serverMap.get(serverName);
if (serverNode != null) {
ensemble.put(serverNode.getServerName(),
serverNode.getRegionInfoList().stream()
.filter(region -> !isTableDisabled(tableStateManager, region.getTable()))
.filter(region -> !region.isSplitParent()).collect(Collectors.toList()));
} else {
ensemble.put(serverName, new ArrayList<>());
}
table.computeIfAbsent(serverName, key -> new ArrayList<>());
}
// Use a fake table name to represent the whole cluster's assignments
result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble);
}
return result;
}
private boolean isTableDisabled(final TableStateManager tableStateManager,
final TableName tableName) {
return tableStateManager
.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING);
final TableName tableName) {
return tableStateManager.isTableState(tableName, TableState.State.DISABLED,
TableState.State.DISABLING);
}
// ==========================================================================

View File

@ -83,6 +83,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected RegionLocationFinder regionFinder;
protected boolean useRegionFinder;
protected boolean isByTable = false;
private static class DefaultRackManager extends RackManager {
@Override
@ -1047,6 +1048,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
if (useRegionFinder) {
regionFinder.setConf(conf);
}
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
LOG.info("slop={}, systemTablesOnMaster={}",
this.slop, this.onlySystemTablesOnMaster);
@ -1150,10 +1152,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
@Override
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
}
@Override
public void setMasterServices(MasterServices masterServices) {
@ -1184,7 +1182,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
this.rackManager = rackManager;
}
protected boolean needsBalance(Cluster c) {
protected boolean needsBalance(TableName tableName, Cluster c) {
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
@ -1639,6 +1637,42 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
serverNameListMap.forEach((serverName, regionInfoList) -> {
List<RegionInfo> regionInfos =
returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
regionInfos.addAll(regionInfoList);
});
}
return returnMap;
}
@Override
public abstract List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable);
@Override
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
if (isByTable) {
List<RegionPlan> result = new ArrayList<>();
loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
LOG.info("Start Generate Balance plan for table: " + tableName);
List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
if (partialPlans != null) {
result.addAll(partialPlans);
}
});
return result;
} else {
LOG.info("Start Generate Balance plan for cluster.");
return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
}
}
@Override
public void onConfigurationChange(Configuration conf) {
}

View File

@ -34,6 +34,7 @@ import java.util.Set;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
@ -695,8 +696,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
* implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
*/
@Override
public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
List<RegionInfo>> clusterState) {
public synchronized List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
if (this.services != null) {
@ -704,7 +705,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
int misplacedRegions = 0;
for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
for (Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
ServerName current = entry.getKey();
List<RegionInfo> regions = Lists.newArrayList();
correctAssignments.put(current, regions);
@ -732,13 +733,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
}
}
LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
if (regionPlansFromBalance != null) {
regionPlans.addAll(regionPlansFromBalance);
}
return regionPlans;
} else {
return super.balanceCluster(clusterState);
return super.balanceTable(tableName, loadOfOneTable);
}
}
}

View File

@ -29,7 +29,6 @@ import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -62,7 +61,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
private RegionInfoComparator riComparator = new RegionInfoComparator();
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
private float avgLoadOverall;
private List<ServerAndLoad> serverLoadList;
private List<ServerAndLoad> serverLoadList = new ArrayList<>();
/**
* Stores additional per-server information about the regions added/removed
@ -106,14 +105,19 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
@Override
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
serverLoadList = new ArrayList<>();
/**
* Pass RegionStates and allow balancer to set the current cluster load.
*/
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
serverLoadList.clear();
Map<ServerName, Integer> server2LoadMap = new HashMap<>();
float sum = 0;
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad.entrySet()) {
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
.entrySet()) {
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
if (entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
if (entry.getKey().equals(masterServerName)) {
continue; // we shouldn't include master as potential assignee
}
int regionNum = entry.getValue().size();
server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
sum += regionNum;
@ -243,34 +247,35 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
* Does this mean we need HeapSize on HMaster? Or just careful monitor?
* (current thinking is we will hold all assignments in memory)
*
* @param clusterMap Map of regionservers and their load/region information to
* @param loadOfOneTable Map of regionservers and their load/region information to
* a list of their most loaded regions
* @return a list of regions to be moved, including source and destination,
* or null if cluster is already balanced
*/
@Override
public List<RegionPlan> balanceCluster(
Map<ServerName, List<RegionInfo>> clusterMap) {
List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable);
if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
return regionsToReturn;
}
if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
if (clusterMap.size() <= 2) {
if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
if (loadOfOneTable.size() <= 2) {
return null;
}
clusterMap = new HashMap<>(clusterMap);
clusterMap.remove(masterServerName);
loadOfOneTable = new HashMap<>(loadOfOneTable);
loadOfOneTable.remove(masterServerName);
}
long startTime = System.currentTimeMillis();
// construct a Cluster object with clusterMap and rest of the
// argument as defaults
Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
ClusterLoadState cs = new ClusterLoadState(clusterMap);
Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager);
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
return null;
}
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
int numServers = cs.getNumServers();
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
int numRegions = cs.getNumRegions();
@ -440,7 +445,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
for (Map.Entry<ServerName, List<RegionInfo>> e: clusterMap.entrySet()) {
for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) {
if (sb.length() > 0) sb.append(", ");
sb.append(e.getKey().toString());
sb.append(" ");
@ -594,10 +599,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
regionsToReturn.add(rp);
}
/**
* Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions
* on every server to achieve overall balanced
*/
@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
LOG.debug("Start Generate Balance plan for table: " + tableName);
return balanceCluster(clusterState);
public synchronized List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
setClusterLoad(loadOfAllTable);
return super.balanceCluster(loadOfAllTable);
}
}

View File

@ -34,7 +34,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
@ -159,8 +158,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RackLocalityCostFunction rackLocalityCost;
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
private boolean isByTable = false;
private TableName tableName = null;
/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
@ -184,7 +181,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
if (localityCandidateGenerator == null) {
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
@ -317,7 +313,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
@Override
protected boolean needsBalance(Cluster cluster) {
protected boolean needsBalance(TableName tableName, Cluster cluster) {
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) {
@ -359,13 +355,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return true;
}
@Override
public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
List<RegionInfo>> clusterState) {
this.tableName = tableName;
return balanceCluster(clusterState);
}
@VisibleForTesting
Cluster.Action nextAction(Cluster cluster) {
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
@ -377,19 +366,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* should always approach the optimal state given enough steps.
*/
@Override
public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
List<RegionInfo>> clusterState) {
List<RegionPlan> plans = balanceMasterRegions(clusterState);
if (plans != null || clusterState == null || clusterState.size() <= 1) {
public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
List<RegionInfo>> loadOfOneTable) {
List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
return plans;
}
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
if (clusterState.size() <= 2) {
if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
if (loadOfOneTable.size() <= 2) {
return null;
}
clusterState = new HashMap<>(clusterState);
clusterState.remove(masterServerName);
loadOfOneTable = new HashMap<>(loadOfOneTable);
loadOfOneTable.remove(masterServerName);
}
// On clusters with lots of HFileLinks or lots of reference files,
@ -405,13 +394,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
//The clusterState that is given to this method contains the state
//of all the regions in the table(s) (that's true today)
// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
long startTime = EnvironmentEdgeManager.currentTime();
initCosts(cluster);
if (!needsBalance(cluster)) {
if (!needsBalance(tableName, cluster)) {
return null;
}

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -111,51 +109,46 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
this.masterServices = masterServices;
}
/**
* Override to balance by RSGroup
* not invoke {@link #balanceTable(TableName, Map)}
*/
@Override
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
clusterState) throws IOException {
return balanceCluster(clusterState);
}
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
throws IOException {
public List<RegionPlan> balanceCluster(
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
if (!isOnline()) {
throw new ConstraintException(
RSGroupInfoManager.class.getSimpleName() + " is not online, unable to perform balance");
}
// Calculate correct assignments and a list of RegionPlan for mis-placed regions
Pair<Map<ServerName,List<RegionInfo>>, List<RegionPlan>> correctedStateAndRegionPlans =
correctAssignments(clusterState);
Map<ServerName,List<RegionInfo>> correctedState = correctedStateAndRegionPlans.getFirst();
Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
correctedStateAndRegionPlans.getFirst();
List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
// Add RegionPlan
// for the regions which have been placed according to the region server group assignment
// into the movement list
try {
// Record which region servers have been processedso as to skip them after processed
HashSet<ServerName> processedServers = new HashSet<>();
// For each rsgroup
for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
for (ServerName server : clusterState.keySet()) { // for each region server
if (!processedServers.contains(server) // server is not processed yet
&& rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup
List<RegionInfo> regionsOnServer = correctedState.get(server);
groupClusterState.put(server, regionsOnServer);
processedServers.add(server);
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
.entrySet()) {
TableName tableName = entry.getKey();
RSGroupInfo targetRSGInfo = RSGroupUtil
.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName).orElse(defaultInfo);
if (targetRSGInfo.getName().equals(rsgroup.getName())) {
loadOfTablesInGroup.put(tableName, entry.getValue());
}
}
groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
this.internalBalancer.setClusterLoad(groupClusterLoad);
List<RegionPlan> groupPlans = this.internalBalancer
.balanceCluster(groupClusterState);
List<RegionPlan> groupPlans = null;
if (!loadOfTablesInGroup.isEmpty()) {
LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
}
if (groupPlans != null) {
regionPlans.addAll(groupPlans);
}
@ -298,36 +291,42 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
return finalList;
}
private Pair<Map<ServerName, List<RegionInfo>>, List<RegionPlan>> correctAssignments(
Map<ServerName, List<RegionInfo>> existingAssignments) throws IOException {
private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
throws IOException {
// To return
Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
RSGroupInfo defaultInfo = rsGroupInfoManager.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
ServerName currentHostServer = assignments.getKey();
correctAssignments.put(currentHostServer, new LinkedList<>());
List<RegionInfo> regions = assignments.getValue();
for (RegionInfo region : regions) {
RSGroupInfo targetRSGInfo = null;
try {
targetRSGInfo =
RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, region.getTable())
.orElse(defaultInfo);
} catch (IOException exp) {
LOG.debug("RSGroup information null for region of table " + region.getTable(), exp);
}
if (targetRSGInfo == null ||
!targetRSGInfo.containsServer(currentHostServer.getAddress())) { // region is mis-placed
regionPlansForMisplacedRegions.add(new RegionPlan(region, currentHostServer, null));
} else { // region is placed as expected
correctAssignments.get(currentHostServer).add(region);
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
.entrySet()) {
TableName tableName = assignments.getKey();
Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
RSGroupInfo targetRSGInfo = null;
Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
try {
targetRSGInfo = RSGroupUtil.getRSGroupInfo(masterServices, rsGroupInfoManager, tableName)
.orElse(defaultInfo);
} catch (IOException exp) {
LOG.debug("RSGroup information null for region of table " + tableName, exp);
}
for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
ServerName currentHostServer = serverRegionMap.getKey();
List<RegionInfo> regionInfoList = serverRegionMap.getValue();
if (targetRSGInfo == null
|| !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
regionInfoList.forEach(regionInfo -> {
regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
});
} else {
correctServerRegion.put(currentHostServer, regionInfoList);
}
}
correctAssignments.put(tableName, correctServerRegion);
}
// Return correct assignments and region movement plan for mis-placed regions together
return new Pair<Map<ServerName, List<RegionInfo>>, List<RegionPlan>>(
return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
correctAssignments, regionPlansForMisplacedRegions);
}
@ -380,9 +379,6 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
return this.rsGroupInfoManager.isOnline();
}
@Override
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
}
@Override
public void regionOnline(RegionInfo regionInfo, ServerName sn) {
@ -427,4 +423,38 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
public void updateBalancerStatus(boolean status) {
internalBalancer.updateBalancerStatus(status);
}
/**
* can achieve table balanced rather than overall balanced
*/
@Override
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
if (!isOnline()) {
LOG.error(RSGroupInfoManager.class.getSimpleName()
+ " is not online, unable to perform balanceTable");
return null;
}
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
loadOfThisTable.put(tableName, loadOfOneTable);
Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
correctedStateAndRegionPlans;
// Calculate correct assignments and a list of RegionPlan for mis-placed regions
try {
correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
} catch (IOException e) {
LOG.error("get correct assignments and mis-placed regions error ", e);
return null;
}
Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
correctedStateAndRegionPlans.getFirst();
List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
List<RegionPlan> tablePlans =
this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
if (tablePlans != null) {
regionPlans.addAll(tablePlans);
}
return regionPlans;
}
}

View File

@ -1069,18 +1069,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
// We balance per group instead of per table
List<RegionPlan> plans = new ArrayList<>();
Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
getRSGroupAssignmentsByTable(groupName);
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap : assignmentsByTable
.entrySet()) {
LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue());
List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans);
if (partialPlans != null) {
plans.addAll(partialPlans);
}
}
List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
boolean balancerRan = !plans.isEmpty();
if (balancerRan) {
LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());

View File

@ -133,7 +133,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
loadBalancer.balanceCluster(tableName, clusterState);
loadBalancer.balanceTable(tableName, clusterState);
String[] tableNames = new String[] { tableName.getNameAsString() };
String[] functionNames = loadBalancer.getCostFunctionNames();
@ -169,17 +169,17 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
// table 1
TableName tableName = TableName.valueOf(TABLE_NAME_1);
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_pertable_1);
loadBalancer.balanceCluster(tableName, clusterState);
loadBalancer.balanceTable(tableName, clusterState);
// table 2
tableName = TableName.valueOf(TABLE_NAME_2);
clusterState = mockClusterServers(mockCluster_pertable_2);
loadBalancer.balanceCluster(tableName, clusterState);
loadBalancer.balanceTable(tableName, clusterState);
// table hbase:namespace
tableName = TableName.valueOf(TABLE_NAME_NAMESPACE);
clusterState = mockClusterServers(mockCluster_pertable_namespace);
loadBalancer.balanceCluster(tableName, clusterState);
loadBalancer.balanceTable(tableName, clusterState);
String[] tableNames = new String[] { TABLE_NAME_1, TABLE_NAME_2, TABLE_NAME_NAMESPACE };
Set<String> jmxMetrics = readJmxMetricsWithRetry();

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -87,27 +85,9 @@ public class TestBalancer {
ServerManager serverManager = master.getServerManager();
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
assignmentManager.getRegionStates()
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), true);
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList());
assertFalse(assignments.containsKey(disableTableName));
assertTrue(assignments.containsKey(tableName));
assertFalse(assignments.get(tableName).containsKey(sn1));
assignments = assignmentManager.getRegionStates()
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), false);
Map<TableName, Map<ServerName, List<RegionInfo>>> tableNameMap = new HashMap<>();
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments
.get(HConstants.ENSEMBLE_TABLE_NAME).entrySet()) {
final ServerName serverName = entry.getKey();
for (RegionInfo regionInfo : entry.getValue()) {
Map<ServerName, List<RegionInfo>> tableResult =
tableNameMap.computeIfAbsent(regionInfo.getTable(), t -> new HashMap<>());
List<RegionInfo> serverResult =
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
serverResult.add(regionInfo);
}
}
assertFalse(tableNameMap.containsKey(disableTableName));
assertTrue(tableNameMap.containsKey(tableName));
assertFalse(tableNameMap.get(tableName).containsKey(sn1));
}
}

View File

@ -557,7 +557,9 @@ public class BalancerTestBase {
loadBalancer.setRackManager(rackManager);
// Run the balancer.
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNotNull(plans);
// Check to see that this actually got to a stable place.
@ -570,7 +572,8 @@ public class BalancerTestBase {
if (assertFullyBalanced) {
assertClusterAsBalanced(balancedCluster);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNull(secondPlans);
}

View File

@ -173,7 +173,8 @@ public class LoadBalancerPerformanceEvaluation extends AbstractHBaseTool {
methodName = "balanceCluster";
LOG.info("Calling " + methodName);
watch.reset().start();
loadBalancer.balanceCluster(serverRegionMap);
loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverRegionMap);
System.out.print(formatResults(methodName, watch.elapsed(TimeUnit.MILLISECONDS)));
return EXIT_SUCCESS;

View File

@ -58,7 +58,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Base UT of RSGroupableBalancer.
*/
public class RSGroupableBalancerTestBase {
public class RSGroupableBalancerTestBase extends BalancerTestBase{
static SecureRandom rand = new SecureRandom();
static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4" };

View File

@ -37,7 +37,6 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -118,13 +117,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
public static class MockBalancer extends BaseLoadBalancer {
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
public List<RegionPlan>
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
return null;
}
@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
public List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
return null;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -84,14 +85,22 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
LOG.info("Mock Cluster : " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
list, plans);
LOG.info("Mock Balance : " + printStats(balancedCluster));
assertClusterAsBalanced(balancedCluster);
// Test with/without per table balancer.
boolean[] perTableBalancerConfigs = { true, false };
for (boolean isByTable : perTableBalancerConfigs) {
Configuration conf = loadBalancer.getConf();
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
loadBalancer.setConf(conf);
Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
LOG.info("Mock Cluster : " + printStats(list));
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(list, plans);
LOG.info("Mock Balance : " + printStats(balancedCluster));
assertClusterAsBalanced(balancedCluster);
}
}
/**

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
@ -134,7 +135,9 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.setClusterMetrics(clusterStatus);
List<RegionPlan> plans = loadBalancer.balanceCluster(clusterState);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(clusterState);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
Set<RegionInfo> regionsMoveFromServerA = new HashSet<>();
Set<ServerName> targetServers = new HashSet<>();
for(RegionPlan plan : plans) {

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -50,15 +49,15 @@ import org.slf4j.LoggerFactory;
* Test the load balancer that is created by default.
*/
@Category({MasterTests.class, SmallTests.class})
public class TestDefaultLoadBalancer extends BalancerTestBase {
public class TestSimpleLoadBalancer extends BalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultLoadBalancer.class);
HBaseClassTestRule.forClass(TestSimpleLoadBalancer.class);
private static final Logger LOG = LoggerFactory.getLogger(TestDefaultLoadBalancer.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSimpleLoadBalancer.class);
private static LoadBalancer loadBalancer;
private static SimpleLoadBalancer loadBalancer;
@BeforeClass
public static void beforeAllTests() throws Exception {
@ -139,14 +138,18 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
Map<ServerName, List<RegionInfo>> clusterServers = mockClusterServers(mockCluster, 50);
List<ServerAndLoad> clusterList = convertToList(clusterServers);
clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result = mockClusterServersWithTables(clusterServers);
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result =
mockClusterServersWithTables(clusterServers);
loadBalancer.setClusterLoad(clusterLoad);
List<RegionPlan> clusterplans = new ArrayList<>();
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<>();
for(TreeMap<ServerName, List<RegionInfo>> servers : result.values()){
for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : result
.entrySet()) {
TableName tableName = mapEntry.getKey();
TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
if(partialplans != null) clusterplans.addAll(partialplans);
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
@ -176,28 +179,33 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
}
@Test
public void testImpactOfBalanceClusterOverallWithClusterLoadPerTable() throws Exception {
public void testImpactOfBalanceClusterOverallWithLoadOfAllTable() throws Exception {
testImpactOfBalanceClusterOverall(true);
}
private void testImpactOfBalanceClusterOverall(boolean useClusterLoadPerTable) throws Exception {
private void testImpactOfBalanceClusterOverall(boolean useLoadOfAllTable) throws Exception {
Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad = new TreeMap<>();
Map<ServerName, List<RegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
Map<ServerName, List<RegionInfo>> clusterServers =
mockUniformClusterServers(mockUniformCluster);
List<ServerAndLoad> clusterList = convertToList(clusterServers);
clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
// use overall can achieve both table and cluster level balance
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(clusterServers);
if (useClusterLoadPerTable) {
loadBalancer.setClusterLoad((Map)clusterLoadPerTable);
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
mockClusterServersWithTables(clusterServers);
if (useLoadOfAllTable) {
loadBalancer.setClusterLoad((Map) LoadOfAllTable);
} else {
loadBalancer.setClusterLoad(clusterLoad);
}
List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
for (TreeMap<ServerName, List<RegionInfo>> servers : clusterLoadPerTable.values()) {
for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : LoadOfAllTable
.entrySet()) {
TableName tableName = mapEntry.getKey();
TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
if (partialplans != null) clusterplans1.addAll(partialplans);
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
@ -208,29 +216,34 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
}
}
List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
assertTrue(assertClusterOverallAsBalanced(balancedCluster1, clusterLoadPerTable.keySet().size()));
assertTrue(assertClusterOverallAsBalanced(balancedCluster1, LoadOfAllTable.keySet().size()));
}
@Test
public void
testBalanceClusterOverallStrictly() throws Exception {
int[] regionNumOfTable1PerServer = {3, 3, 4, 4, 4, 4, 5, 5, 5};
int[] regionNumOfTable2PerServer = {2, 2, 2, 2, 2, 2, 2, 2, 1};
public void testBalanceClusterOverallStrictly() throws Exception {
int[] regionNumOfTable1PerServer = { 3, 3, 4, 4, 4, 4, 5, 5, 5 };
int[] regionNumOfTable2PerServer = { 2, 2, 2, 2, 2, 2, 2, 2, 1 };
TreeMap<ServerName, List<RegionInfo>> serverRegionInfo = new TreeMap<>();
List<ServerAndLoad> serverAndLoads = new ArrayList<>();
for (int i = 0; i < regionNumOfTable1PerServer.length; i++) {
ServerName serverName = ServerName.valueOf("server" + i, 1000, -1);
List<RegionInfo> regions1 = createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
List<RegionInfo> regions2 = createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
List<RegionInfo> regions1 =
createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
List<RegionInfo> regions2 =
createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
regions1.addAll(regions2);
serverRegionInfo.put(serverName, regions1);
ServerAndLoad serverAndLoad = new ServerAndLoad(serverName, regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
ServerAndLoad serverAndLoad = new ServerAndLoad(serverName,
regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
serverAndLoads.add(serverAndLoad);
}
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(serverRegionInfo);
loadBalancer.setClusterLoad((Map) clusterLoadPerTable);
List<RegionPlan> partialplans = loadBalancer.balanceCluster(clusterLoadPerTable.get(TableName.valueOf("table1")));
List<ServerAndLoad> balancedServerLoads = reconcile(serverAndLoads, partialplans, serverRegionInfo);
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
mockClusterServersWithTables(serverRegionInfo);
loadBalancer.setClusterLoad((Map) LoadOfAllTable);
List<RegionPlan> partialplans = loadBalancer.balanceTable(TableName.valueOf("table1"),
LoadOfAllTable.get(TableName.valueOf("table1")));
List<ServerAndLoad> balancedServerLoads =
reconcile(serverAndLoads, partialplans, serverRegionInfo);
for (ServerAndLoad serverAndLoad : balancedServerLoads) {
assertEquals(6, serverAndLoad.getLoad());
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
@ -172,19 +173,19 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
// serverC : 0,0,0
// so should move two regions from serverA to serverB & serverC
serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(serverA,
regionsOnServerA, 1000));
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(serverA, regionsOnServerA, 1000));
serverMetricsMap.put(serverB, mockServerMetricsWithCpRequests(serverB, regionsOnServerB, 0));
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
clusterStatus = mock(ClusterMetrics.class);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.setClusterMetrics(clusterStatus);
List<RegionPlan> plans = loadBalancer.balanceCluster(clusterState);
List<RegionPlan> plans =
loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, clusterState);
Set<RegionInfo> regionsMoveFromServerA = new HashSet<>();
Set<ServerName> targetServers = new HashSet<>();
for(RegionPlan plan : plans) {
if(plan.getSource().equals(serverA)) {
for (RegionPlan plan : plans) {
if (plan.getSource().equals(serverA)) {
regionsMoveFromServerA.add(plan.getRegionInfo());
targetServers.add(plan.getDestination());
}
@ -251,8 +252,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
assertNull(plans);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
assertTrue(plans == null || plans.isEmpty());
}
}
} finally {
@ -451,7 +454,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
List<ServerAndLoad> list = convertToList(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
List<RegionPlan> plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
assertNotNull(plans);
// Apply the plan to the mock cluster.
@ -465,7 +468,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
serverMap.put(deadSn, new ArrayList<>(0));
plans = loadBalancer.balanceCluster(serverMap);
plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
assertNull(plans);
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -58,11 +59,15 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(servers);
LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
assertNull(secondPlans);
for (Map.Entry<ServerName, List<RegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -148,7 +149,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
TestStochasticLoadBalancerHeterogeneousCostRules.createRulesFile(RULES_FILE);
final Map<ServerName, List<RegionInfo>> serverMap =
this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
final List<RegionPlan> plans =
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
// As we disabled all the other cost functions, balancing only according to
// the heterogeneous cost function should return nothing.
assertNull(plans);
@ -172,7 +174,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
BalancerTestBase.loadBalancer.setRackManager(rackManager);
// Run the balancer.
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
final List<RegionPlan> plans =
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
assertNotNull(plans);
// Check to see that this actually got to a stable place.
@ -185,7 +188,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
if (assertFullyBalanced) {
final List<RegionPlan> secondPlans =
BalancerTestBase.loadBalancer.balanceCluster(serverMap);
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
assertNull(secondPlans);
// create external cost function to retrieve limit

View File

@ -30,6 +30,7 @@ import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -150,7 +151,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
// until the step above s1 holds two replicas of a region
regions = randomRegions(1);
map.put(s2, regions);
assertTrue(loadBalancer.needsBalance(new Cluster(map, null, null, null)));
assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new Cluster(map, null, null, null)));
// check for the case where there are two hosts on the same rack and there are two racks
// and both the replicas are on the same rack
map.clear();
@ -162,7 +164,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
// add another server so that the cluster has some host on another rack
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
assertTrue(
loadBalancer.needsBalance(new Cluster(map, null, null, new ForTestRackManagerOne())));
loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
new Cluster(map, null, null, new ForTestRackManagerOne())));
}
@Test