HBASE-20791 RSGroupBasedLoadBalancer#setClusterMetrics should pass ClusterMetrics to its internalBalancer (chenxu) - files missing from previous commit

This commit is contained in:
tedyu 2018-06-28 01:22:07 -07:00
parent c23e61f20d
commit 65d84df005
2 changed files with 18 additions and 426 deletions

View File

@ -93,11 +93,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
@Override
public void setConf(Configuration conf) {
this.config = conf;
if(internalBalancer != null) {
internalBalancer.setConf(conf);
}
}
@Override
public void setClusterMetrics(ClusterMetrics sm) {
this.clusterStatus = sm;
if (internalBalancer != null) {
internalBalancer.setClusterMetrics(sm);
}
}
@Override
@ -358,8 +364,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
}
private Pair<Map<ServerName, List<RegionInfo>>, List<RegionPlan>> correctAssignments(
Map<ServerName, List<RegionInfo>> existingAssignments)
throws HBaseIOException{
Map<ServerName, List<RegionInfo>> existingAssignments) throws HBaseIOException{
// To return
Map<ServerName, List<RegionInfo>> correctAssignments = new TreeMap<>();
List<RegionPlan> regionPlansForMisplacedRegions = new ArrayList<>();
@ -423,7 +428,9 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
StochasticLoadBalancer.class, LoadBalancer.class);
internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
internalBalancer.setMasterServices(masterServices);
internalBalancer.setClusterMetrics(clusterStatus);
if(clusterStatus != null) {
internalBalancer.setClusterMetrics(clusterStatus);
}
internalBalancer.setConf(config);
internalBalancer.initialize();
}

View File

@ -21,9 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -32,75 +29,47 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
//TODO use stochastic based load balancer instead
/**
* Test RSGroupBasedLoadBalancer with SimpleLoadBalancer as internal balancer
*/
@Category(SmallTests.class)
public class TestRSGroupBasedLoadBalancer {
public class TestRSGroupBasedLoadBalancer extends RSGroupableBalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRSGroupBasedLoadBalancer.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRSGroupBasedLoadBalancer.class);
private static RSGroupBasedLoadBalancer loadBalancer;
private static SecureRandom rand;
static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", "dg4" };
static TableName table0 = TableName.valueOf("dt0");
static TableName[] tables =
new TableName[] { TableName.valueOf("dt1"),
TableName.valueOf("dt2"),
TableName.valueOf("dt3"),
TableName.valueOf("dt4")};
static List<ServerName> servers;
static Map<String, RSGroupInfo> groupMap;
static Map<TableName, String> tableMap;
static List<HTableDescriptor> tableDescs;
int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 };
static int regionId = 0;
@BeforeClass
public static void beforeAllTests() throws Exception {
rand = new SecureRandom();
servers = generateServers(7);
groupMap = constructGroupInfo(servers, groups);
tableMap = new HashMap<>();
tableDescs = constructTableDesc();
tableDescs = constructTableDesc(true);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.regions.slop", "0");
conf.set("hbase.rsgroup.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
@ -129,62 +98,6 @@ public class TestRSGroupBasedLoadBalancer {
assertClusterAsBalanced(balancedCluster);
}
/**
* Invariant is that all servers of a group have load between floor(avg) and
* ceiling(avg) number of regions.
*/
private void assertClusterAsBalanced(
ArrayListMultimap<String, ServerAndLoad> groupLoadMap) {
for (String gName : groupLoadMap.keySet()) {
List<ServerAndLoad> groupLoad = groupLoadMap.get(gName);
int numServers = groupLoad.size();
int numRegions = 0;
int maxRegions = 0;
int minRegions = Integer.MAX_VALUE;
for (ServerAndLoad server : groupLoad) {
int nr = server.getLoad();
if (nr > maxRegions) {
maxRegions = nr;
}
if (nr < minRegions) {
minRegions = nr;
}
numRegions += nr;
}
if (maxRegions - minRegions < 2) {
// less than 2 between max and min, can't balance
return;
}
int min = numRegions / numServers;
int max = numRegions % numServers == 0 ? min : min + 1;
for (ServerAndLoad server : groupLoad) {
assertTrue(server.getLoad() <= max);
assertTrue(server.getLoad() >= min);
}
}
}
/**
* All regions have an assignment.
*/
private void assertImmediateAssignment(List<RegionInfo> regions,
List<ServerName> servers,
Map<RegionInfo, ServerName> assignments)
throws IOException {
for (RegionInfo region : regions) {
assertTrue(assignments.containsKey(region));
ServerName server = assignments.get(region);
TableName tableName = region.getTable();
String groupName = getMockedGroupInfoManager().getRSGroupOfTable(tableName);
assertTrue(StringUtils.isNotEmpty(groupName));
RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName);
assertTrue("Region is not correctly assigned to group servers.",
gInfo.containsServer(server.getAddress()));
}
}
/**
* Tests the bulk assignment used during cluster startup.
*
@ -235,6 +148,7 @@ public class TestRSGroupBasedLoadBalancer {
Set<RegionInfo> misplacedRegions = loadBalancer.getMisplacedRegions(inputForTest);
assertFalse(misplacedRegions.contains(ri));
}
/**
* Test the cluster startup bulk assignment which attempts to retain assignment info.
*/
@ -284,333 +198,4 @@ public class TestRSGroupBasedLoadBalancer {
.roundRobinAssignment(regions, onlineServers);
assertEquals(bogusRegion, assignments.get(LoadBalancer.BOGUS_SERVER_NAME).size());
}
/**
* Asserts a valid retained assignment plan.
* <p>
* Must meet the following conditions:
* <ul>
* <li>Every input region has an assignment, and to an online server
* <li>If a region had an existing assignment to a server with the same
* address a a currently online server, it will be assigned to it
* </ul>
*/
private void assertRetainedAssignment(
Map<RegionInfo, ServerName> existing, List<ServerName> servers,
Map<ServerName, List<RegionInfo>> assignment)
throws FileNotFoundException, IOException {
// Verify condition 1, every region assigned, and to online server
Set<ServerName> onlineServerSet = new TreeSet<>(servers);
Set<RegionInfo> assignedRegions = new TreeSet<>(RegionInfo.COMPARATOR);
for (Map.Entry<ServerName, List<RegionInfo>> a : assignment.entrySet()) {
assertTrue(
"Region assigned to server that was not listed as online",
onlineServerSet.contains(a.getKey()));
for (RegionInfo r : a.getValue()) {
assignedRegions.add(r);
}
}
assertEquals(existing.size(), assignedRegions.size());
// Verify condition 2, every region must be assigned to correct server.
Set<String> onlineHostNames = new TreeSet<>();
for (ServerName s : servers) {
onlineHostNames.add(s.getHostname());
}
for (Map.Entry<ServerName, List<RegionInfo>> a : assignment.entrySet()) {
ServerName currentServer = a.getKey();
for (RegionInfo r : a.getValue()) {
ServerName oldAssignedServer = existing.get(r);
TableName tableName = r.getTable();
String groupName =
getMockedGroupInfoManager().getRSGroupOfTable(tableName);
assertTrue(StringUtils.isNotEmpty(groupName));
RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
groupName);
assertTrue(
"Region is not correctly assigned to group servers.",
gInfo.containsServer(currentServer.getAddress()));
if (oldAssignedServer != null
&& onlineHostNames.contains(oldAssignedServer
.getHostname())) {
// this region was previously assigned somewhere, and that
// host is still around, then the host must have been is a
// different group.
if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) {
assertFalse(gInfo.containsServer(oldAssignedServer.getAddress()));
}
}
}
}
}
private String printStats(
ArrayListMultimap<String, ServerAndLoad> groupBasedLoad) {
StringBuffer sb = new StringBuffer();
sb.append("\n");
for (String groupName : groupBasedLoad.keySet()) {
sb.append("Stats for group: " + groupName);
sb.append("\n");
sb.append(groupMap.get(groupName).getServers());
sb.append("\n");
List<ServerAndLoad> groupLoad = groupBasedLoad.get(groupName);
int numServers = groupLoad.size();
int totalRegions = 0;
sb.append("Per Server Load: \n");
for (ServerAndLoad sLoad : groupLoad) {
sb.append("Server :" + sLoad.getServerName() + " Load : "
+ sLoad.getLoad() + "\n");
totalRegions += sLoad.getLoad();
}
sb.append(" Group Statistics : \n");
float average = (float) totalRegions / numServers;
int max = (int) Math.ceil(average);
int min = (int) Math.floor(average);
sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg="
+ average + " max=" + max + " min=" + min + "]");
sb.append("\n");
sb.append("===============================");
sb.append("\n");
}
return sb.toString();
}
private ArrayListMultimap<String, ServerAndLoad> convertToGroupBasedMap(
final Map<ServerName, List<RegionInfo>> serversMap) throws IOException {
ArrayListMultimap<String, ServerAndLoad> loadMap = ArrayListMultimap
.create();
for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) {
Set<Address> groupServers = gInfo.getServers();
for (Address hostPort : groupServers) {
ServerName actual = null;
for(ServerName entry: servers) {
if(entry.getAddress().equals(hostPort)) {
actual = entry;
break;
}
}
List<RegionInfo> regions = serversMap.get(actual);
assertTrue("No load for " + actual, regions != null);
loadMap.put(gInfo.getName(),
new ServerAndLoad(actual, regions.size()));
}
}
return loadMap;
}
private ArrayListMultimap<String, ServerAndLoad> reconcile(
ArrayListMultimap<String, ServerAndLoad> previousLoad,
List<RegionPlan> plans) {
ArrayListMultimap<String, ServerAndLoad> result = ArrayListMultimap
.create();
result.putAll(previousLoad);
if (plans != null) {
for (RegionPlan plan : plans) {
ServerName source = plan.getSource();
updateLoad(result, source, -1);
ServerName destination = plan.getDestination();
updateLoad(result, destination, +1);
}
}
return result;
}
private void updateLoad(
ArrayListMultimap<String, ServerAndLoad> previousLoad,
final ServerName sn, final int diff) {
for (String groupName : previousLoad.keySet()) {
ServerAndLoad newSAL = null;
ServerAndLoad oldSAL = null;
for (ServerAndLoad sal : previousLoad.get(groupName)) {
if (ServerName.isSameAddress(sn, sal.getServerName())) {
oldSAL = sal;
newSAL = new ServerAndLoad(sn, sal.getLoad() + diff);
break;
}
}
if (newSAL != null) {
previousLoad.remove(groupName, oldSAL);
previousLoad.put(groupName, newSAL);
break;
}
}
}
private Map<ServerName, List<RegionInfo>> mockClusterServers() throws IOException {
assertTrue(servers.size() == regionAssignment.length);
Map<ServerName, List<RegionInfo>> assignment = new TreeMap<>();
for (int i = 0; i < servers.size(); i++) {
int numRegions = regionAssignment[i];
List<RegionInfo> regions = assignedRegions(numRegions, servers.get(i));
assignment.put(servers.get(i), regions);
}
return assignment;
}
/**
* Generate a list of regions evenly distributed between the tables.
*
* @param numRegions The number of regions to be generated.
* @return List of RegionInfo.
*/
private List<RegionInfo> randomRegions(int numRegions) {
List<RegionInfo> regions = new ArrayList<>(numRegions);
byte[] start = new byte[16];
byte[] end = new byte[16];
rand.nextBytes(start);
rand.nextBytes(end);
int regionIdx = rand.nextInt(tables.length);
for (int i = 0; i < numRegions; i++) {
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
int tableIndex = (i + regionIdx) % tables.length;
regions.add(RegionInfoBuilder.newBuilder(tables[tableIndex])
.setStartKey(start)
.setEndKey(end)
.setSplit(false)
.setRegionId(regionId++)
.build());
}
return regions;
}
/**
* Generate assigned regions to a given server using group information.
*
* @param numRegions the num regions to generate
* @param sn the servername
* @return the list of regions
* @throws java.io.IOException Signals that an I/O exception has occurred.
*/
private List<RegionInfo> assignedRegions(int numRegions, ServerName sn) throws IOException {
List<RegionInfo> regions = new ArrayList<>(numRegions);
byte[] start = new byte[16];
byte[] end = new byte[16];
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
for (int i = 0; i < numRegions; i++) {
TableName tableName = getTableName(sn);
regions.add(RegionInfoBuilder.newBuilder(tableName)
.setStartKey(start)
.setEndKey(end)
.setSplit(false)
.setRegionId(regionId++)
.build());
}
return regions;
}
private static List<ServerName> generateServers(int numServers) {
List<ServerName> servers = new ArrayList<>(numServers);
for (int i = 0; i < numServers; i++) {
String host = "server" + rand.nextInt(100000);
int port = rand.nextInt(60000);
servers.add(ServerName.valueOf(host, port, -1));
}
return servers;
}
/**
* Construct group info, with each group having at least one server.
*
* @param servers the servers
* @param groups the groups
* @return the map
*/
private static Map<String, RSGroupInfo> constructGroupInfo(
List<ServerName> servers, String[] groups) {
assertTrue(servers != null);
assertTrue(servers.size() >= groups.length);
int index = 0;
Map<String, RSGroupInfo> groupMap = new HashMap<>();
for (String grpName : groups) {
RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName);
RSGroupInfo.addServer(servers.get(index).getAddress());
groupMap.put(grpName, RSGroupInfo);
index++;
}
while (index < servers.size()) {
int grpIndex = rand.nextInt(groups.length);
groupMap.get(groups[grpIndex]).addServer(
servers.get(index).getAddress());
index++;
}
return groupMap;
}
/**
* Construct table descriptors evenly distributed between the groups.
*
* @return the list
*/
private static List<HTableDescriptor> constructTableDesc() {
List<HTableDescriptor> tds = Lists.newArrayList();
int index = rand.nextInt(groups.length);
for (int i = 0; i < tables.length; i++) {
HTableDescriptor htd = new HTableDescriptor(tables[i]);
int grpIndex = (i + index) % groups.length ;
String groupName = groups[grpIndex];
tableMap.put(tables[i], groupName);
tds.add(htd);
}
tableMap.put(table0, "");
tds.add(new HTableDescriptor(table0));
return tds;
}
private static MasterServices getMockedMaster() throws IOException {
TableDescriptors tds = Mockito.mock(TableDescriptors.class);
Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0));
Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1));
Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2));
Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3));
MasterServices services = Mockito.mock(HMaster.class);
Mockito.when(services.getTableDescriptors()).thenReturn(tds);
AssignmentManager am = Mockito.mock(AssignmentManager.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am);
return services;
}
private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException {
RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class);
Mockito.when(gm.getRSGroup(groups[0])).thenReturn(
groupMap.get(groups[0]));
Mockito.when(gm.getRSGroup(groups[1])).thenReturn(
groupMap.get(groups[1]));
Mockito.when(gm.getRSGroup(groups[2])).thenReturn(
groupMap.get(groups[2]));
Mockito.when(gm.getRSGroup(groups[3])).thenReturn(
groupMap.get(groups[3]));
Mockito.when(gm.listRSGroups()).thenReturn(
Lists.newLinkedList(groupMap.values()));
Mockito.when(gm.isOnline()).thenReturn(true);
Mockito.when(gm.getRSGroupOfTable(Mockito.any()))
.thenAnswer(new Answer<String>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
return tableMap.get(invocation.getArgument(0));
}
});
return gm;
}
private TableName getTableName(ServerName sn) throws IOException {
TableName tableName = null;
RSGroupInfoManager gm = getMockedGroupInfoManager();
RSGroupInfo groupOfServer = null;
for(RSGroupInfo gInfo : gm.listRSGroups()){
if(gInfo.containsServer(sn.getAddress())){
groupOfServer = gInfo;
break;
}
}
for(HTableDescriptor desc : tableDescs){
if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){
tableName = desc.getTableName();
}
}
return tableName;
}
}