HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve overallbalanced (#1324)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
88ac6db032
commit
0c545cf4d4
|
@ -453,16 +453,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
|
||||
//We balance per group instead of per table
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap:
|
||||
getRSGroupAssignmentsByTable(groupName).entrySet()) {
|
||||
LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue());
|
||||
List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
|
||||
LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans);
|
||||
if (partialPlans != null) {
|
||||
plans.addAll(partialPlans);
|
||||
}
|
||||
}
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> assignmentsByTable =
|
||||
getRSGroupAssignmentsByTable(groupName);
|
||||
List<RegionPlan> plans = balancer.balanceCluster(assignmentsByTable);
|
||||
boolean balancerRan = !plans.isEmpty();
|
||||
if (balancerRan) {
|
||||
LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
|
||||
|
|
|
@ -22,8 +22,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -32,7 +30,6 @@ import java.util.TreeMap;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -42,6 +39,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -110,50 +108,45 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to balance by RSGroup
|
||||
* not invoke {@link #balanceTable(TableName, Map)}
|
||||
*/
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<RegionInfo>>
|
||||
clusterState) throws HBaseIOException {
|
||||
return balanceCluster(clusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState)
|
||||
throws HBaseIOException {
|
||||
public List<RegionPlan> balanceCluster(
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException {
|
||||
if (!isOnline()) {
|
||||
throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
|
||||
" is not online, unable to perform balance");
|
||||
}
|
||||
|
||||
Map<ServerName,List<RegionInfo>> correctedState = correctAssignments(clusterState);
|
||||
List<RegionPlan> regionPlans = new ArrayList<>();
|
||||
|
||||
List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
|
||||
for (RegionInfo regionInfo : misplacedRegions) {
|
||||
ServerName serverName = findServerForRegion(clusterState, regionInfo);
|
||||
regionPlans.add(new RegionPlan(regionInfo, serverName, null));
|
||||
}
|
||||
// Calculate correct assignments and a list of RegionPlan for mis-placed regions
|
||||
Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
|
||||
correctedStateAndRegionPlans = correctAssignments(loadOfAllTable);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfAllTable =
|
||||
correctedStateAndRegionPlans.getFirst();
|
||||
List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
|
||||
// Add RegionPlan for the regions which have been placed according to the region server group
|
||||
// assignment into the movement list
|
||||
try {
|
||||
// Record which region servers have been processed,so as to skip them after processed
|
||||
HashSet<ServerName> processedServers = new HashSet<>();
|
||||
|
||||
// For each rsgroup
|
||||
for (RSGroupInfo rsgroup : rsGroupInfoManager.listRSGroups()) {
|
||||
Map<ServerName, List<RegionInfo>> groupClusterState = new HashMap<>();
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> groupClusterLoad = new HashMap<>();
|
||||
for (ServerName server : clusterState.keySet()) { // for each region server
|
||||
if (!processedServers.contains(server) // server is not processed yet
|
||||
&& rsgroup.containsServer(server.getAddress())) { // server belongs to this rsgroup
|
||||
List<RegionInfo> regionsOnServer = correctedState.get(server);
|
||||
groupClusterState.put(server, regionsOnServer);
|
||||
|
||||
processedServers.add(server);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfTablesInGroup = new HashMap<>();
|
||||
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> entry : correctedLoadOfAllTable
|
||||
.entrySet()) {
|
||||
TableName tableName = entry.getKey();
|
||||
String targetRSGroupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
|
||||
if (targetRSGroupName == null) {
|
||||
targetRSGroupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
if (targetRSGroupName.equals(rsgroup.getName())) {
|
||||
loadOfTablesInGroup.put(tableName, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
|
||||
this.internalBalancer.setClusterLoad(groupClusterLoad);
|
||||
List<RegionPlan> groupPlans = this.internalBalancer
|
||||
.balanceCluster(groupClusterState);
|
||||
List<RegionPlan> groupPlans = null;
|
||||
if (!loadOfTablesInGroup.isEmpty()) {
|
||||
LOG.info("Start Generate Balance plan for group: " + rsgroup.getName());
|
||||
groupPlans = this.internalBalancer.balanceCluster(loadOfTablesInGroup);
|
||||
}
|
||||
if (groupPlans != null) {
|
||||
regionPlans.addAll(groupPlans);
|
||||
}
|
||||
|
@ -296,47 +289,45 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
return finalList;
|
||||
}
|
||||
|
||||
private ServerName findServerForRegion(
|
||||
Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
|
||||
if (entry.getValue().contains(region)) {
|
||||
return entry.getKey();
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalStateException("Could not find server for region "
|
||||
+ region.getShortNameToLog());
|
||||
}
|
||||
|
||||
private Map<ServerName, List<RegionInfo>> correctAssignments(
|
||||
Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{
|
||||
Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
|
||||
correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<>());
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> assignments : existingAssignments.entrySet()){
|
||||
ServerName sName = assignments.getKey();
|
||||
correctAssignments.put(sName, new LinkedList<>());
|
||||
List<RegionInfo> regions = assignments.getValue();
|
||||
for (RegionInfo region : regions) {
|
||||
RSGroupInfo targetRSGInfo = null;
|
||||
try {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
|
||||
} catch (IOException exp) {
|
||||
LOG.debug("RSGroup information null for region of table " + region.getTable(),
|
||||
exp);
|
||||
private Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
|
||||
correctAssignments(Map<TableName, Map<ServerName, List<RegionInfo>>> existingAssignments)
|
||||
throws IOException {
|
||||
// To return
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> correctAssignments = new HashMap<>();
|
||||
List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
|
||||
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> assignments : existingAssignments
|
||||
.entrySet()) {
|
||||
TableName tableName = assignments.getKey();
|
||||
Map<ServerName, List<RegionInfo>> clusterLoad = assignments.getValue();
|
||||
Map<ServerName, List<RegionInfo>> correctServerRegion = new TreeMap<>();
|
||||
RSGroupInfo targetRSGInfo = null;
|
||||
try {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(tableName);
|
||||
if (groupName == null) {
|
||||
LOG.debug("Group not found for table " + tableName + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
if ((targetRSGInfo == null) || (!targetRSGInfo.containsServer(sName.getAddress()))) {
|
||||
correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
|
||||
targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
|
||||
} catch (IOException exp) {
|
||||
LOG.debug("RSGroup information null for region of table " + tableName, exp);
|
||||
}
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> serverRegionMap : clusterLoad.entrySet()) {
|
||||
ServerName currentHostServer = serverRegionMap.getKey();
|
||||
List<RegionInfo> regionInfoList = serverRegionMap.getValue();
|
||||
if (targetRSGInfo == null
|
||||
|| !targetRSGInfo.containsServer(currentHostServer.getAddress())) {
|
||||
regionInfoList.forEach(regionInfo -> {
|
||||
regionPlansForMisplacedRegions.add(new RegionPlan(regionInfo, currentHostServer, null));
|
||||
});
|
||||
} else {
|
||||
correctAssignments.get(sName).add(region);
|
||||
correctServerRegion.put(currentHostServer, regionInfoList);
|
||||
}
|
||||
}
|
||||
correctAssignments.put(tableName, correctServerRegion);
|
||||
}
|
||||
return correctAssignments;
|
||||
// Return correct assignments and region movement plan for mis-placed regions together
|
||||
return new Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>(
|
||||
correctAssignments, regionPlansForMisplacedRegions);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -382,9 +373,6 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
return this.rsGroupInfoManager.isOnline();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void regionOnline(RegionInfo regionInfo, ServerName sn) {
|
||||
|
@ -421,4 +409,38 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
public void updateBalancerStatus(boolean status) {
|
||||
internalBalancer.updateBalancerStatus(status);
|
||||
}
|
||||
|
||||
/**
|
||||
* can achieve table balanced rather than overall balanced
|
||||
*/
|
||||
@Override
|
||||
public List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
if (!isOnline()) {
|
||||
LOG.error(RSGroupInfoManager.class.getSimpleName()
|
||||
+ " is not online, unable to perform balanceTable");
|
||||
return null;
|
||||
}
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfThisTable = new HashMap<>();
|
||||
loadOfThisTable.put(tableName, loadOfOneTable);
|
||||
Pair<Map<TableName, Map<ServerName, List<RegionInfo>>>, List<RegionPlan>>
|
||||
correctedStateAndRegionPlans;
|
||||
// Calculate correct assignments and a list of RegionPlan for mis-placed regions
|
||||
try {
|
||||
correctedStateAndRegionPlans = correctAssignments(loadOfThisTable);
|
||||
} catch (IOException e) {
|
||||
LOG.error("get correct assignments and mis-placed regions error ", e);
|
||||
return null;
|
||||
}
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> correctedLoadOfThisTable =
|
||||
correctedStateAndRegionPlans.getFirst();
|
||||
List<RegionPlan> regionPlans = correctedStateAndRegionPlans.getSecond();
|
||||
List<RegionPlan> tablePlans =
|
||||
this.internalBalancer.balanceTable(tableName, correctedLoadOfThisTable.get(tableName));
|
||||
|
||||
if (tablePlans != null) {
|
||||
regionPlans.addAll(tablePlans);
|
||||
}
|
||||
return regionPlans;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
/**
|
||||
* Base UT of RSGroupableBalancer.
|
||||
*/
|
||||
public class RSGroupableBalancerTestBase {
|
||||
public class RSGroupableBalancerTestBase extends BalancerTestBase{
|
||||
|
||||
static SecureRandom rand = new SecureRandom();
|
||||
static String[] groups = new String[] {RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4"};
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -83,14 +84,22 @@ public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
|
|||
*/
|
||||
@Test
|
||||
public void testBalanceCluster() throws Exception {
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
|
||||
ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
|
||||
LOG.info("Mock Cluster : " + printStats(list));
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
|
||||
ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
|
||||
list, plans);
|
||||
LOG.info("Mock Balance : " + printStats(balancedCluster));
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
// Test with/without per table balancer.
|
||||
boolean[] perTableBalancerConfigs = { true, false };
|
||||
for (boolean isByTable : perTableBalancerConfigs) {
|
||||
Configuration conf = loadBalancer.getConf();
|
||||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||
loadBalancer.setConf(conf);
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers();
|
||||
ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
|
||||
LOG.info("Mock Cluster : " + printStats(list));
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(servers);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(list, plans);
|
||||
LOG.info("Mock Balance : " + printStats(balancedCluster));
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -32,11 +33,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||
|
@ -97,7 +98,7 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
|||
* Test HBASE-20791
|
||||
*/
|
||||
@Test
|
||||
public void testBalanceCluster() throws HBaseIOException {
|
||||
public void testBalanceCluster() throws IOException {
|
||||
// mock cluster State
|
||||
Map<ServerName, List<RegionInfo>> clusterState = new HashMap<ServerName, List<RegionInfo>>();
|
||||
ServerName serverA = servers.get(0);
|
||||
|
@ -133,7 +134,9 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
|
|||
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
|
||||
loadBalancer.setClusterMetrics(clusterStatus);
|
||||
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(clusterState);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(clusterState);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
Set<RegionInfo> regionsMoveFromServerA = new HashSet<>();
|
||||
Set<ServerName> targetServers = new HashSet<>();
|
||||
for(RegionPlan plan : plans) {
|
||||
|
|
|
@ -87,10 +87,11 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
|
||||
//TODO. Look at is whether Stochastic loadbalancer can be integrated with this
|
||||
public List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
//perform a scan of the meta to get the latest updates (if any)
|
||||
// perform a scan of the meta to get the latest updates (if any)
|
||||
SnapshotOfRegionAssignmentFromMeta snaphotOfRegionAssignment =
|
||||
new SnapshotOfRegionAssignmentFromMeta(super.services.getConnection());
|
||||
try {
|
||||
|
@ -99,43 +100,44 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
LOG.warn("Not running balancer since exception was thrown " + ie);
|
||||
return plans;
|
||||
}
|
||||
// This is not used? Findbugs says so: Map<ServerName, ServerName> serverNameToServerNameWithoutCode = new HashMap<>();
|
||||
// This is not used? Findbugs says so: Map<ServerName, ServerName>
|
||||
// serverNameToServerNameWithoutCode = new HashMap<>();
|
||||
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
|
||||
ServerManager serverMgr = super.services.getServerManager();
|
||||
for (ServerName sn: serverMgr.getOnlineServersList()) {
|
||||
for (ServerName sn : serverMgr.getOnlineServersList()) {
|
||||
ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
|
||||
// FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
|
||||
serverNameWithoutCodeToServerName.put(s, sn);
|
||||
}
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
|
||||
ServerName currentServer = entry.getKey();
|
||||
//get a server without the startcode for the currentServer
|
||||
// get a server without the startcode for the currentServer
|
||||
ServerName currentServerWithoutStartCode = ServerName.valueOf(currentServer.getHostname(),
|
||||
currentServer.getPort(), ServerName.NON_STARTCODE);
|
||||
currentServer.getPort(), ServerName.NON_STARTCODE);
|
||||
List<RegionInfo> list = entry.getValue();
|
||||
for (RegionInfo region : list) {
|
||||
if(!FavoredNodesManager.isFavoredNodeApplicable(region)) {
|
||||
if (!FavoredNodesManager.isFavoredNodeApplicable(region)) {
|
||||
continue;
|
||||
}
|
||||
List<ServerName> favoredNodes = fnm.getFavoredNodes(region);
|
||||
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
|
||||
continue; //either favorednodes does not exist or we are already on the primary node
|
||||
continue; // either favorednodes does not exist or we are already on the primary node
|
||||
}
|
||||
ServerName destination = null;
|
||||
//check whether the primary is available
|
||||
// check whether the primary is available
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
|
||||
if (destination == null) {
|
||||
//check whether the region is on secondary/tertiary
|
||||
if (currentServerWithoutStartCode.equals(favoredNodes.get(1)) ||
|
||||
currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
|
||||
// check whether the region is on secondary/tertiary
|
||||
if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
|
||||
|| currentServerWithoutStartCode.equals(favoredNodes.get(2))) {
|
||||
continue;
|
||||
}
|
||||
//the region is currently on none of the favored nodes
|
||||
//get it on one of them if possible
|
||||
ServerMetrics l1 = super.services.getServerManager().getLoad(
|
||||
serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
|
||||
ServerMetrics l2 = super.services.getServerManager().getLoad(
|
||||
serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
|
||||
// the region is currently on none of the favored nodes
|
||||
// get it on one of them if possible
|
||||
ServerMetrics l1 = super.services.getServerManager()
|
||||
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
|
||||
ServerMetrics l2 = super.services.getServerManager()
|
||||
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
|
||||
if (l1 != null && l2 != null) {
|
||||
if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
|
||||
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
|
||||
|
@ -435,9 +437,4 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements Favored
|
|||
fnm.updateFavoredNodes(regionFNMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
|
||||
return balanceCluster(clusterState);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,6 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -1763,26 +1762,17 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
}
|
||||
|
||||
boolean isByTable = getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
|
||||
this.assignmentManager.getRegionStates()
|
||||
.getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList(),
|
||||
isByTable);
|
||||
.getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList());
|
||||
for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
|
||||
serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
|
||||
}
|
||||
|
||||
//Give the balancer the current cluster state.
|
||||
this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
|
||||
this.balancer.setClusterLoad(assignments);
|
||||
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) {
|
||||
List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
|
||||
if (partialPlans != null) {
|
||||
plans.addAll(partialPlans);
|
||||
}
|
||||
}
|
||||
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
|
||||
|
||||
if (skipRegionManagementAction("balancer")) {
|
||||
// make one last check that the cluster isn't shutting down before proceeding.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
|
@ -70,12 +71,6 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
|
|||
*/
|
||||
void setClusterMetrics(ClusterMetrics st);
|
||||
|
||||
/**
|
||||
* Pass RegionStates and allow balancer to set the current cluster load.
|
||||
* @param ClusterLoad
|
||||
*/
|
||||
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> ClusterLoad);
|
||||
|
||||
/**
|
||||
* Set the master service.
|
||||
* @param masterServices
|
||||
|
@ -83,22 +78,25 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
|
|||
void setMasterServices(MasterServices masterServices);
|
||||
|
||||
/**
|
||||
* Perform the major balance operation
|
||||
* @param tableName
|
||||
* @param clusterState
|
||||
* @return List of plans
|
||||
* Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do
|
||||
* actual balance. Normally not need override this method, except SimpleLoadBalancer and
|
||||
* RSGroupBasedLoadBalancer.
|
||||
* @param loadOfAllTable region load of servers for all table
|
||||
* @return a list of regions to be moved, including source and destination, or null if cluster is
|
||||
* already balanced
|
||||
*/
|
||||
List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
|
||||
List<RegionInfo>> clusterState) throws HBaseIOException;
|
||||
List<RegionPlan> balanceCluster(Map<TableName,
|
||||
Map<ServerName, List<RegionInfo>>> loadOfAllTable) throws IOException;
|
||||
|
||||
/**
|
||||
* Perform the major balance operation
|
||||
* @param clusterState
|
||||
* Perform the major balance operation for table, all class implement of {@link LoadBalancer}
|
||||
* should override this method
|
||||
* @param tableName the table to be balanced
|
||||
* @param loadOfOneTable region load of servers for the specific one table
|
||||
* @return List of plans
|
||||
*/
|
||||
List<RegionPlan> balanceCluster(Map<ServerName,
|
||||
List<RegionInfo>> clusterState) throws HBaseIOException;
|
||||
|
||||
List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable);
|
||||
/**
|
||||
* Perform a Round Robin assignment of regions.
|
||||
* @param regions
|
||||
|
|
|
@ -32,7 +32,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -538,61 +537,42 @@ public class RegionStates {
|
|||
* wants to iterate this exported list. We need to synchronize on regions
|
||||
* since all access to this.servers is under a lock on this.regions.
|
||||
*
|
||||
* @param isByTable If <code>true</code>, return the assignments by table. If <code>false</code>,
|
||||
* return the assignments which aggregate the server-load to the cluster level.
|
||||
* @return A clone of current assignments.
|
||||
*/
|
||||
public Map<TableName, Map<ServerName, List<RegionInfo>>> getAssignmentsForBalancer(
|
||||
TableStateManager tableStateManager, List<ServerName> onlineServers, boolean isByTable) {
|
||||
TableStateManager tableStateManager, List<ServerName> onlineServers) {
|
||||
final Map<TableName, Map<ServerName, List<RegionInfo>>> result = new HashMap<>();
|
||||
if (isByTable) {
|
||||
for (RegionStateNode node : regionsMap.values()) {
|
||||
if (isTableDisabled(tableStateManager, node.getTable())) {
|
||||
continue;
|
||||
}
|
||||
if (node.getRegionInfo().isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
Map<ServerName, List<RegionInfo>> tableResult =
|
||||
result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
|
||||
final ServerName serverName = node.getRegionLocation();
|
||||
if (serverName == null) {
|
||||
LOG.info("Skipping, no server for " + node);
|
||||
continue;
|
||||
}
|
||||
List<RegionInfo> serverResult =
|
||||
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
|
||||
serverResult.add(node.getRegionInfo());
|
||||
for (RegionStateNode node : regionsMap.values()) {
|
||||
if (isTableDisabled(tableStateManager, node.getTable())) {
|
||||
continue;
|
||||
}
|
||||
// Add online servers with no assignment for the table.
|
||||
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
|
||||
for (ServerName serverName : onlineServers) {
|
||||
table.putIfAbsent(serverName, new ArrayList<>());
|
||||
}
|
||||
if (node.getRegionInfo().isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
final HashMap<ServerName, List<RegionInfo>> ensemble = new HashMap<>(serverMap.size());
|
||||
Map<ServerName, List<RegionInfo>> tableResult =
|
||||
result.computeIfAbsent(node.getTable(), t -> new HashMap<>());
|
||||
final ServerName serverName = node.getRegionLocation();
|
||||
if (serverName == null) {
|
||||
LOG.info("Skipping, no server for " + node);
|
||||
continue;
|
||||
}
|
||||
List<RegionInfo> serverResult =
|
||||
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
|
||||
serverResult.add(node.getRegionInfo());
|
||||
}
|
||||
// Add online servers with no assignment for the table.
|
||||
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
|
||||
for (ServerName serverName : onlineServers) {
|
||||
ServerStateNode serverNode = serverMap.get(serverName);
|
||||
if (serverNode != null) {
|
||||
ensemble.put(serverNode.getServerName(),
|
||||
serverNode.getRegionInfoList().stream()
|
||||
.filter(region -> !isTableDisabled(tableStateManager, region.getTable()))
|
||||
.filter(region -> !region.isSplitParent()).collect(Collectors.toList()));
|
||||
} else {
|
||||
ensemble.put(serverName, new ArrayList<>());
|
||||
}
|
||||
table.computeIfAbsent(serverName, key -> new ArrayList<>());
|
||||
}
|
||||
// Use a fake table name to represent the whole cluster's assignments
|
||||
result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean isTableDisabled(final TableStateManager tableStateManager,
|
||||
final TableName tableName) {
|
||||
return tableStateManager
|
||||
.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING);
|
||||
final TableName tableName) {
|
||||
return tableStateManager.isTableState(tableName, TableState.State.DISABLED,
|
||||
TableState.State.DISABLING);
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
|
|
|
@ -83,6 +83,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
protected RegionLocationFinder regionFinder;
|
||||
protected boolean useRegionFinder;
|
||||
protected boolean isByTable = false;
|
||||
|
||||
private static class DefaultRackManager extends RackManager {
|
||||
@Override
|
||||
|
@ -1047,6 +1048,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
if (useRegionFinder) {
|
||||
regionFinder.setConf(conf);
|
||||
}
|
||||
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||
// Print out base configs. Don't print overallSlop since it for simple balancer exclusively.
|
||||
LOG.info("slop={}, systemTablesOnMaster={}",
|
||||
this.slop, this.onlySystemTablesOnMaster);
|
||||
|
@ -1150,10 +1152,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
|
@ -1184,7 +1182,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
this.rackManager = rackManager;
|
||||
}
|
||||
|
||||
protected boolean needsBalance(Cluster c) {
|
||||
protected boolean needsBalance(TableName tableName, Cluster c) {
|
||||
ClusterLoadState cs = new ClusterLoadState(c.clusterState);
|
||||
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1639,6 +1637,42 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable) {
|
||||
Map<ServerName, List<RegionInfo>> returnMap = new TreeMap<>();
|
||||
for (Map<ServerName, List<RegionInfo>> serverNameListMap : LoadOfAllTable.values()) {
|
||||
serverNameListMap.forEach((serverName, regionInfoList) -> {
|
||||
List<RegionInfo> regionInfos =
|
||||
returnMap.computeIfAbsent(serverName, k -> new ArrayList<>());
|
||||
regionInfos.addAll(regionInfoList);
|
||||
});
|
||||
}
|
||||
return returnMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable);
|
||||
|
||||
@Override
|
||||
public List<RegionPlan>
|
||||
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
||||
if (isByTable) {
|
||||
List<RegionPlan> result = new ArrayList<>();
|
||||
loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
|
||||
LOG.info("Start Generate Balance plan for table: " + tableName);
|
||||
List<RegionPlan> partialPlans = balanceTable(tableName, loadOfOneTable);
|
||||
if (partialPlans != null) {
|
||||
result.addAll(partialPlans);
|
||||
}
|
||||
});
|
||||
return result;
|
||||
} else {
|
||||
LOG.info("Start Generate Balance plan for cluster.");
|
||||
return balanceTable(HConstants.ENSEMBLE_TABLE_NAME, toEnsumbleTableLoad(loadOfAllTable));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Set;
|
|||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
||||
|
@ -694,8 +695,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
* implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
|
||||
List<RegionInfo>> clusterState) {
|
||||
public synchronized List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
|
||||
if (this.services != null) {
|
||||
|
||||
|
@ -703,7 +704,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
Map<ServerName, List<RegionInfo>> correctAssignments = new HashMap<>();
|
||||
int misplacedRegions = 0;
|
||||
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : loadOfOneTable.entrySet()) {
|
||||
ServerName current = entry.getKey();
|
||||
List<RegionInfo> regions = Lists.newArrayList();
|
||||
correctAssignments.put(current, regions);
|
||||
|
@ -731,13 +732,13 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
}
|
||||
}
|
||||
LOG.debug("Found misplaced regions: " + misplacedRegions + ", not on favored nodes.");
|
||||
List<RegionPlan> regionPlansFromBalance = super.balanceCluster(correctAssignments);
|
||||
List<RegionPlan> regionPlansFromBalance = super.balanceTable(tableName, correctAssignments);
|
||||
if (regionPlansFromBalance != null) {
|
||||
regionPlans.addAll(regionPlansFromBalance);
|
||||
}
|
||||
return regionPlans;
|
||||
} else {
|
||||
return super.balanceCluster(clusterState);
|
||||
return super.balanceTable(tableName, loadOfOneTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Random;
|
|||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -62,7 +61,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
private RegionInfoComparator riComparator = new RegionInfoComparator();
|
||||
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
|
||||
private float avgLoadOverall;
|
||||
private List<ServerAndLoad> serverLoadList;
|
||||
private List<ServerAndLoad> serverLoadList = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Stores additional per-server information about the regions added/removed
|
||||
|
@ -106,14 +105,19 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
|
||||
serverLoadList = new ArrayList<>();
|
||||
/**
|
||||
* Pass RegionStates and allow balancer to set the current cluster load.
|
||||
*/
|
||||
void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad) {
|
||||
serverLoadList.clear();
|
||||
Map<ServerName, Integer> server2LoadMap = new HashMap<>();
|
||||
float sum = 0;
|
||||
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad.entrySet()) {
|
||||
for (Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> clusterEntry : clusterLoad
|
||||
.entrySet()) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterEntry.getValue().entrySet()) {
|
||||
if (entry.getKey().equals(masterServerName)) continue; // we shouldn't include master as potential assignee
|
||||
if (entry.getKey().equals(masterServerName)) {
|
||||
continue; // we shouldn't include master as potential assignee
|
||||
}
|
||||
int regionNum = entry.getValue().size();
|
||||
server2LoadMap.compute(entry.getKey(), (k, v) -> v == null ? regionNum : regionNum + v);
|
||||
sum += regionNum;
|
||||
|
@ -243,34 +247,35 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
* Does this mean we need HeapSize on HMaster? Or just careful monitor?
|
||||
* (current thinking is we will hold all assignments in memory)
|
||||
*
|
||||
* @param clusterMap Map of regionservers and their load/region information to
|
||||
* @param loadOfOneTable Map of regionservers and their load/region information to
|
||||
* a list of their most loaded regions
|
||||
* @return a list of regions to be moved, including source and destination,
|
||||
* or null if cluster is already balanced
|
||||
*/
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(
|
||||
Map<ServerName, List<RegionInfo>> clusterMap) {
|
||||
List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
|
||||
if (regionsToReturn != null || clusterMap == null || clusterMap.size() <= 1) {
|
||||
public List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
List<RegionPlan> regionsToReturn = balanceMasterRegions(loadOfOneTable);
|
||||
if (regionsToReturn != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
|
||||
return regionsToReturn;
|
||||
}
|
||||
if (masterServerName != null && clusterMap.containsKey(masterServerName)) {
|
||||
if (clusterMap.size() <= 2) {
|
||||
if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
|
||||
if (loadOfOneTable.size() <= 2) {
|
||||
return null;
|
||||
}
|
||||
clusterMap = new HashMap<>(clusterMap);
|
||||
clusterMap.remove(masterServerName);
|
||||
loadOfOneTable = new HashMap<>(loadOfOneTable);
|
||||
loadOfOneTable.remove(masterServerName);
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// construct a Cluster object with clusterMap and rest of the
|
||||
// argument as defaults
|
||||
Cluster c = new Cluster(clusterMap, null, this.regionFinder, this.rackManager);
|
||||
if (!this.needsBalance(c) && !this.overallNeedsBalance()) return null;
|
||||
|
||||
ClusterLoadState cs = new ClusterLoadState(clusterMap);
|
||||
Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager);
|
||||
if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) {
|
||||
return null;
|
||||
}
|
||||
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
|
||||
int numServers = cs.getNumServers();
|
||||
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
|
||||
int numRegions = cs.getNumRegions();
|
||||
|
@ -440,7 +445,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
|
||||
", serversUnderloaded=" + serversUnderloaded);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> e: clusterMap.entrySet()) {
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) {
|
||||
if (sb.length() > 0) sb.append(", ");
|
||||
sb.append(e.getKey().toString());
|
||||
sb.append(" ");
|
||||
|
@ -594,10 +599,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
regionsToReturn.add(rp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to invoke {@link #setClusterLoad} before balance, We need clusterLoad of all regions
|
||||
* on every server to achieve overall balanced
|
||||
*/
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
|
||||
LOG.debug("Start Generate Balance plan for table: " + tableName);
|
||||
return balanceCluster(clusterState);
|
||||
public synchronized List<RegionPlan>
|
||||
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
||||
setClusterLoad(loadOfAllTable);
|
||||
return super.balanceCluster(loadOfAllTable);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.stream.Collectors;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionMetrics;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -159,8 +158,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private RackLocalityCostFunction rackLocalityCost;
|
||||
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
|
||||
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
|
||||
private boolean isByTable = false;
|
||||
private TableName tableName = null;
|
||||
|
||||
/**
|
||||
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
|
||||
|
@ -184,7 +181,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, runMaxSteps);
|
||||
|
||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||
isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
||||
if (localityCandidateGenerator == null) {
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
||||
|
@ -316,7 +312,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsBalance(Cluster cluster) {
|
||||
protected boolean needsBalance(TableName tableName, Cluster cluster) {
|
||||
ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
|
||||
if (cs.getNumServers() < MIN_SERVER_BALANCE) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -358,13 +354,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
|
||||
List<RegionInfo>> clusterState) {
|
||||
this.tableName = tableName;
|
||||
return balanceCluster(clusterState);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Cluster.Action nextAction(Cluster cluster) {
|
||||
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
|
||||
|
@ -376,19 +365,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* should always approach the optimal state given enough steps.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
|
||||
List<RegionInfo>> clusterState) {
|
||||
List<RegionPlan> plans = balanceMasterRegions(clusterState);
|
||||
if (plans != null || clusterState == null || clusterState.size() <= 1) {
|
||||
public synchronized List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
|
||||
List<RegionInfo>> loadOfOneTable) {
|
||||
List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
|
||||
if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
|
||||
return plans;
|
||||
}
|
||||
|
||||
if (masterServerName != null && clusterState.containsKey(masterServerName)) {
|
||||
if (clusterState.size() <= 2) {
|
||||
if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
|
||||
if (loadOfOneTable.size() <= 2) {
|
||||
return null;
|
||||
}
|
||||
clusterState = new HashMap<>(clusterState);
|
||||
clusterState.remove(masterServerName);
|
||||
loadOfOneTable = new HashMap<>(loadOfOneTable);
|
||||
loadOfOneTable.remove(masterServerName);
|
||||
}
|
||||
|
||||
// On clusters with lots of HFileLinks or lots of reference files,
|
||||
|
@ -404,13 +393,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
//The clusterState that is given to this method contains the state
|
||||
//of all the regions in the table(s) (that's true today)
|
||||
// Keep track of servers to iterate through them.
|
||||
Cluster cluster = new Cluster(clusterState, loads, finder, rackManager);
|
||||
Cluster cluster = new Cluster(loadOfOneTable, loads, finder, rackManager);
|
||||
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
initCosts(cluster);
|
||||
|
||||
if (!needsBalance(cluster)) {
|
||||
if (!needsBalance(tableName, cluster)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
|||
|
||||
TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
|
||||
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
|
||||
loadBalancer.balanceCluster(tableName, clusterState);
|
||||
loadBalancer.balanceTable(tableName, clusterState);
|
||||
|
||||
String[] tableNames = new String[] { tableName.getNameAsString() };
|
||||
String[] functionNames = loadBalancer.getCostFunctionNames();
|
||||
|
@ -169,17 +169,17 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
|||
// table 1
|
||||
TableName tableName = TableName.valueOf(TABLE_NAME_1);
|
||||
Map<ServerName, List<RegionInfo>> clusterState = mockClusterServers(mockCluster_pertable_1);
|
||||
loadBalancer.balanceCluster(tableName, clusterState);
|
||||
loadBalancer.balanceTable(tableName, clusterState);
|
||||
|
||||
// table 2
|
||||
tableName = TableName.valueOf(TABLE_NAME_2);
|
||||
clusterState = mockClusterServers(mockCluster_pertable_2);
|
||||
loadBalancer.balanceCluster(tableName, clusterState);
|
||||
loadBalancer.balanceTable(tableName, clusterState);
|
||||
|
||||
// table hbase:namespace
|
||||
tableName = TableName.valueOf(TABLE_NAME_NAMESPACE);
|
||||
clusterState = mockClusterServers(mockCluster_pertable_namespace);
|
||||
loadBalancer.balanceCluster(tableName, clusterState);
|
||||
loadBalancer.balanceTable(tableName, clusterState);
|
||||
|
||||
String[] tableNames = new String[] { TABLE_NAME_1, TABLE_NAME_2, TABLE_NAME_NAMESPACE };
|
||||
Set<String> jmxMetrics = readJmxMetricsWithRetry();
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.master;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -87,27 +85,9 @@ public class TestBalancer {
|
|||
ServerManager serverManager = master.getServerManager();
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
|
||||
assignmentManager.getRegionStates()
|
||||
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), true);
|
||||
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList());
|
||||
assertFalse(assignments.containsKey(disableTableName));
|
||||
assertTrue(assignments.containsKey(tableName));
|
||||
assertFalse(assignments.get(tableName).containsKey(sn1));
|
||||
|
||||
assignments = assignmentManager.getRegionStates()
|
||||
.getAssignmentsForBalancer(tableStateManager, serverManager.getOnlineServersList(), false);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> tableNameMap = new HashMap<>();
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments
|
||||
.get(HConstants.ENSEMBLE_TABLE_NAME).entrySet()) {
|
||||
final ServerName serverName = entry.getKey();
|
||||
for (RegionInfo regionInfo : entry.getValue()) {
|
||||
Map<ServerName, List<RegionInfo>> tableResult =
|
||||
tableNameMap.computeIfAbsent(regionInfo.getTable(), t -> new HashMap<>());
|
||||
List<RegionInfo> serverResult =
|
||||
tableResult.computeIfAbsent(serverName, s -> new ArrayList<>());
|
||||
serverResult.add(regionInfo);
|
||||
}
|
||||
}
|
||||
assertFalse(tableNameMap.containsKey(disableTableName));
|
||||
assertTrue(tableNameMap.containsKey(tableName));
|
||||
assertFalse(tableNameMap.get(tableName).containsKey(sn1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -557,7 +557,9 @@ public class BalancerTestBase {
|
|||
|
||||
loadBalancer.setRackManager(rackManager);
|
||||
// Run the balancer.
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(serverMap);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Check to see that this actually got to a stable place.
|
||||
|
@ -570,7 +572,8 @@ public class BalancerTestBase {
|
|||
|
||||
if (assertFullyBalanced) {
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
|
||||
LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
assertNull(secondPlans);
|
||||
}
|
||||
|
||||
|
|
|
@ -173,7 +173,8 @@ public class LoadBalancerPerformanceEvaluation extends AbstractHBaseTool {
|
|||
methodName = "balanceCluster";
|
||||
LOG.info("Calling " + methodName);
|
||||
watch.reset().start();
|
||||
loadBalancer.balanceCluster(serverRegionMap);
|
||||
|
||||
loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverRegionMap);
|
||||
System.out.print(formatResults(methodName, watch.elapsed(TimeUnit.MILLISECONDS)));
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.commons.lang3.ArrayUtils;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -117,13 +116,14 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
|
|||
|
||||
public static class MockBalancer extends BaseLoadBalancer {
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
|
||||
public List<RegionPlan>
|
||||
balanceCluster(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> clusterState) throws HBaseIOException {
|
||||
public List<RegionPlan> balanceTable(TableName tableName,
|
||||
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -50,15 +49,15 @@ import org.slf4j.LoggerFactory;
|
|||
* Test the load balancer that is created by default.
|
||||
*/
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestDefaultLoadBalancer extends BalancerTestBase {
|
||||
public class TestSimpleLoadBalancer extends BalancerTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestDefaultLoadBalancer.class);
|
||||
HBaseClassTestRule.forClass(TestSimpleLoadBalancer.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestDefaultLoadBalancer.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSimpleLoadBalancer.class);
|
||||
|
||||
private static LoadBalancer loadBalancer;
|
||||
private static SimpleLoadBalancer loadBalancer;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
|
@ -129,6 +128,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
|
|||
*
|
||||
* Invariant is that all servers should be hosting either floor(average) or
|
||||
* ceiling(average) at both table level and cluster level
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testBalanceClusterOverall() throws Exception {
|
||||
|
@ -138,14 +138,17 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
|
|||
List<ServerAndLoad> clusterList = convertToList(clusterServers);
|
||||
clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
|
||||
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> result =
|
||||
mockClusterServersWithTables(clusterServers);
|
||||
mockClusterServersWithTables(clusterServers);
|
||||
loadBalancer.setClusterLoad(clusterLoad);
|
||||
List<RegionPlan> clusterplans = new ArrayList<>();
|
||||
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<>();
|
||||
for(TreeMap<ServerName, List<RegionInfo>> servers : result.values()){
|
||||
for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : result
|
||||
.entrySet()) {
|
||||
TableName tableName = mapEntry.getKey();
|
||||
TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
|
||||
if(partialplans != null) clusterplans.addAll(partialplans);
|
||||
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
|
||||
|
@ -175,28 +178,33 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testImpactOfBalanceClusterOverallWithClusterLoadPerTable() throws Exception {
|
||||
public void testImpactOfBalanceClusterOverallWithLoadOfAllTable() throws Exception {
|
||||
testImpactOfBalanceClusterOverall(true);
|
||||
}
|
||||
|
||||
private void testImpactOfBalanceClusterOverall(boolean useClusterLoadPerTable) throws Exception {
|
||||
private void testImpactOfBalanceClusterOverall(boolean useLoadOfAllTable) throws Exception {
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad = new TreeMap<>();
|
||||
Map<ServerName, List<RegionInfo>> clusterServers = mockUniformClusterServers(mockUniformCluster);
|
||||
Map<ServerName, List<RegionInfo>> clusterServers =
|
||||
mockUniformClusterServers(mockUniformCluster);
|
||||
List<ServerAndLoad> clusterList = convertToList(clusterServers);
|
||||
clusterLoad.put(TableName.valueOf(name.getMethodName()), clusterServers);
|
||||
// use overall can achieve both table and cluster level balance
|
||||
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(clusterServers);
|
||||
if (useClusterLoadPerTable) {
|
||||
loadBalancer.setClusterLoad((Map)clusterLoadPerTable);
|
||||
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
mockClusterServersWithTables(clusterServers);
|
||||
if (useLoadOfAllTable) {
|
||||
loadBalancer.setClusterLoad((Map) LoadOfAllTable);
|
||||
} else {
|
||||
loadBalancer.setClusterLoad(clusterLoad);
|
||||
}
|
||||
List<RegionPlan> clusterplans1 = new ArrayList<RegionPlan>();
|
||||
List<Pair<TableName, Integer>> regionAmountList = new ArrayList<Pair<TableName, Integer>>();
|
||||
for (TreeMap<ServerName, List<RegionInfo>> servers : clusterLoadPerTable.values()) {
|
||||
for (Map.Entry<TableName, TreeMap<ServerName, List<RegionInfo>>> mapEntry : LoadOfAllTable
|
||||
.entrySet()) {
|
||||
TableName tableName = mapEntry.getKey();
|
||||
TreeMap<ServerName, List<RegionInfo>> servers = mapEntry.getValue();
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceCluster(servers);
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceTable(tableName, servers);
|
||||
if (partialplans != null) clusterplans1.addAll(partialplans);
|
||||
List<ServerAndLoad> balancedClusterPerTable = reconcile(list, partialplans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedClusterPerTable));
|
||||
|
@ -207,29 +215,34 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
|
|||
}
|
||||
}
|
||||
List<ServerAndLoad> balancedCluster1 = reconcile(clusterList, clusterplans1, clusterServers);
|
||||
assertTrue(assertClusterOverallAsBalanced(balancedCluster1, clusterLoadPerTable.keySet().size()));
|
||||
assertTrue(assertClusterOverallAsBalanced(balancedCluster1, LoadOfAllTable.keySet().size()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void
|
||||
testBalanceClusterOverallStrictly() throws Exception {
|
||||
int[] regionNumOfTable1PerServer = {3, 3, 4, 4, 4, 4, 5, 5, 5};
|
||||
int[] regionNumOfTable2PerServer = {2, 2, 2, 2, 2, 2, 2, 2, 1};
|
||||
public void testBalanceClusterOverallStrictly() throws Exception {
|
||||
int[] regionNumOfTable1PerServer = { 3, 3, 4, 4, 4, 4, 5, 5, 5 };
|
||||
int[] regionNumOfTable2PerServer = { 2, 2, 2, 2, 2, 2, 2, 2, 1 };
|
||||
TreeMap<ServerName, List<RegionInfo>> serverRegionInfo = new TreeMap<>();
|
||||
List<ServerAndLoad> serverAndLoads = new ArrayList<>();
|
||||
for (int i = 0; i < regionNumOfTable1PerServer.length; i++) {
|
||||
ServerName serverName = ServerName.valueOf("server" + i, 1000, -1);
|
||||
List<RegionInfo> regions1 = createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
|
||||
List<RegionInfo> regions2 = createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
|
||||
List<RegionInfo> regions1 =
|
||||
createRegions(regionNumOfTable1PerServer[i], TableName.valueOf("table1"));
|
||||
List<RegionInfo> regions2 =
|
||||
createRegions(regionNumOfTable2PerServer[i], TableName.valueOf("table2"));
|
||||
regions1.addAll(regions2);
|
||||
serverRegionInfo.put(serverName, regions1);
|
||||
ServerAndLoad serverAndLoad = new ServerAndLoad(serverName, regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
|
||||
ServerAndLoad serverAndLoad = new ServerAndLoad(serverName,
|
||||
regionNumOfTable1PerServer[i] + regionNumOfTable2PerServer[i]);
|
||||
serverAndLoads.add(serverAndLoad);
|
||||
}
|
||||
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> clusterLoadPerTable = mockClusterServersWithTables(serverRegionInfo);
|
||||
loadBalancer.setClusterLoad((Map) clusterLoadPerTable);
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceCluster(clusterLoadPerTable.get(TableName.valueOf("table1")));
|
||||
List<ServerAndLoad> balancedServerLoads = reconcile(serverAndLoads, partialplans, serverRegionInfo);
|
||||
HashMap<TableName, TreeMap<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
mockClusterServersWithTables(serverRegionInfo);
|
||||
loadBalancer.setClusterLoad((Map) LoadOfAllTable);
|
||||
List<RegionPlan> partialplans = loadBalancer.balanceTable(TableName.valueOf("table1"),
|
||||
LoadOfAllTable.get(TableName.valueOf("table1")));
|
||||
List<ServerAndLoad> balancedServerLoads =
|
||||
reconcile(serverAndLoads, partialplans, serverRegionInfo);
|
||||
for (ServerAndLoad serverAndLoad : balancedServerLoads) {
|
||||
assertEquals(6, serverAndLoad.getLoad());
|
||||
}
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
|
|||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
|
@ -170,8 +171,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
loadBalancer.setConf(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
|
||||
assertNull(plans);
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(servers);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
assertTrue(plans == null || plans.isEmpty());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -363,7 +366,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
List<ServerAndLoad> list = convertToList(serverMap);
|
||||
|
||||
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
|
||||
List<RegionPlan> plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Apply the plan to the mock cluster.
|
||||
|
@ -377,7 +380,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
|
||||
serverMap.put(deadSn, new ArrayList<>(0));
|
||||
|
||||
plans = loadBalancer.balanceCluster(serverMap);
|
||||
plans = loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
|
||||
assertNull(plans);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -58,11 +59,15 @@ public class TestStochasticLoadBalancerBalanceCluster extends BalancerTestBase {
|
|||
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
|
||||
List<ServerAndLoad> list = convertToList(servers);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
|
||||
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(servers);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
|
||||
LOG.info("Mock Balance : " + printMock(balancedCluster));
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(servers);
|
||||
LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
assertNull(secondPlans);
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : servers.entrySet()) {
|
||||
returnRegions(entry.getValue());
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
|
@ -148,7 +149,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
TestStochasticLoadBalancerHeterogeneousCostRules.createRulesFile(RULES_FILE);
|
||||
final Map<ServerName, List<RegionInfo>> serverMap =
|
||||
this.createServerMap(numNodes, numRegions, numRegionsPerServer, 1, 1);
|
||||
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
final List<RegionPlan> plans =
|
||||
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
|
||||
// As we disabled all the other cost functions, balancing only according to
|
||||
// the heterogeneous cost function should return nothing.
|
||||
assertNull(plans);
|
||||
|
@ -172,7 +174,8 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
BalancerTestBase.loadBalancer.setRackManager(rackManager);
|
||||
|
||||
// Run the balancer.
|
||||
final List<RegionPlan> plans = BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
final List<RegionPlan> plans =
|
||||
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Check to see that this actually got to a stable place.
|
||||
|
@ -185,7 +188,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
|
|||
|
||||
if (assertFullyBalanced) {
|
||||
final List<RegionPlan> secondPlans =
|
||||
BalancerTestBase.loadBalancer.balanceCluster(serverMap);
|
||||
BalancerTestBase.loadBalancer.balanceTable(HConstants.ENSEMBLE_TABLE_NAME, serverMap);
|
||||
assertNull(secondPlans);
|
||||
|
||||
// create external cost function to retrieve limit
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.TreeMap;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
|
@ -150,7 +151,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
// until the step above s1 holds two replicas of a region
|
||||
regions = randomRegions(1);
|
||||
map.put(s2, regions);
|
||||
assertTrue(loadBalancer.needsBalance(new Cluster(map, null, null, null)));
|
||||
assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
|
||||
new Cluster(map, null, null, null)));
|
||||
// check for the case where there are two hosts on the same rack and there are two racks
|
||||
// and both the replicas are on the same rack
|
||||
map.clear();
|
||||
|
@ -162,7 +164,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
// add another server so that the cluster has some host on another rack
|
||||
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
|
||||
assertTrue(
|
||||
loadBalancer.needsBalance(new Cluster(map, null, null, new ForTestRackManagerOne())));
|
||||
loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME,
|
||||
new Cluster(map, null, null, new ForTestRackManagerOne())));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue