HBASE-22411 Refactor codes of moving reigons in RSGroup

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
haxiaolin 2019-05-27 11:04:42 +08:00 committed by Guanghao Zhang
parent 78319fab87
commit dffe01432d
1 changed files with 58 additions and 113 deletions

View File

@ -46,7 +46,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
/** /**
@ -61,8 +60,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
private MasterServices master; private MasterServices master;
private final RSGroupInfoManager rsGroupInfoManager; private final RSGroupInfoManager rsGroupInfoManager;
public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager) public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager) {
throws IOException {
this.master = master; this.master = master;
this.rsGroupInfoManager = rsGroupInfoManager; this.rsGroupInfoManager = rsGroupInfoManager;
} }
@ -196,46 +194,38 @@ public class RSGroupAdminServer implements RSGroupAdmin {
} }
/** /**
* Moves every region from servers which are currently located on these servers, * Move every region from servers which are currently located on these servers,
* but should not be located there. * but should not be located there.
*
* @param servers the servers that will move to new group * @param servers the servers that will move to new group
* @param tables these tables will be kept on the servers, others will be moved
* @param targetGroupName the target group name * @param targetGroupName the target group name
* @throws IOException if moving the server and tables fail * @throws IOException if moving the server and tables fail
*/ */
private void moveRegionsFromServers(Set<Address> servers, Set<TableName> tables, private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName)
String targetGroupName) throws IOException { throws IOException {
boolean foundRegionsToMove; boolean hasRegionsToMove;
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName); RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
Set<Address> allSevers = new HashSet<>(servers); Set<Address> allSevers = new HashSet<>(servers);
do { do {
foundRegionsToMove = false; hasRegionsToMove = false;
for (Iterator<Address> iter = allSevers.iterator(); iter.hasNext();) { for (Iterator<Address> iter = allSevers.iterator(); iter.hasNext();) {
Address rs = iter.next(); Address rs = iter.next();
// Get regions that are associated with this server and filter regions by tables. // Get regions that are associated with this server and filter regions by group tables.
List<RegionInfo> regions = new ArrayList<>();
for (RegionInfo region : getRegions(rs)) { for (RegionInfo region : getRegions(rs)) {
if (!tables.contains(region.getTable())) { if (!targetGrp.containsTable(region.getTable())) {
regions.add(region); LOG.info("Moving server region {}, which do not belong to RSGroup {}",
region.getShortNameToLog(), targetGroupName);
this.master.getAssignmentManager().move(region);
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
continue;
}
hasRegionsToMove = true;
} }
} }
LOG.info("Moving " + regions.size() + " region(s) from " + rs + if (!hasRegionsToMove) {
" for server move to " + targetGroupName); LOG.info("Server {} has no more regions to move for RSGroup", rs.getHostname());
if (!regions.isEmpty()) {
for (RegionInfo region: regions) {
// Regions might get assigned from tables of target group so we need to filter
if (!targetGrp.containsTable(region.getTable())) {
this.master.getAssignmentManager().move(region);
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
continue;
}
foundRegionsToMove = true;
}
}
}
if (!foundRegionsToMove) {
iter.remove(); iter.remove();
} }
} }
@ -245,26 +235,31 @@ public class RSGroupAdminServer implements RSGroupAdmin {
LOG.warn("Sleep interrupted", e); LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} while (foundRegionsToMove); } while (hasRegionsToMove);
} }
/** /**
* Moves every region of tables which should be kept on the servers, * Moves regions of tables which are not on target group servers.
* but currently they are located on other servers. *
* @param servers the regions of these servers will be kept on the servers, others will be moved
* @param tables the tables that will move to new group * @param tables the tables that will move to new group
* @param targetGroupName the target group name * @param targetGroupName the target group name
* @throws IOException if moving the region fails * @throws IOException if moving the region fails
*/ */
private void moveRegionsToServers(Set<Address> servers, Set<TableName> tables, private void moveTableRegionsToGroup(Set<TableName> tables, String targetGroupName)
String targetGroupName) throws IOException { throws IOException {
for (TableName table: tables) { RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
LOG.info("Moving region(s) from " + table + " for table move to " + targetGroupName); for (TableName table : tables) {
if (master.getAssignmentManager().isTableDisabled(table)) {
LOG.debug("Skipping move regions because the table {} is disabled", table);
continue;
}
LOG.info("Moving region(s) for table {} to RSGroup {}", table, targetGroupName);
for (RegionInfo region : master.getAssignmentManager().getRegionStates() for (RegionInfo region : master.getAssignmentManager().getRegionStates()
.getRegionsOfTable(table)) { .getRegionsOfTable(table)) {
ServerName sn = master.getAssignmentManager().getRegionStates() ServerName sn =
.getRegionServerOfRegion(region); master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
if (!servers.contains(sn.getAddress())) { if (!targetGrp.containsServer(sn.getAddress())) {
LOG.info("Moving region {} to RSGroup {}", region.getShortNameToLog(), targetGroupName);
master.getAssignmentManager().move(region); master.getAssignmentManager().move(region);
} }
} }
@ -285,7 +280,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// TODO. Why? Stuff breaks if I equate them. // TODO. Why? Stuff breaks if I equate them.
return; return;
} }
RSGroupInfo targetGrp = getAndCheckRSGroupInfo(targetGroupName); //check target group
getAndCheckRSGroupInfo(targetGroupName);
// Hold a lock on the manager instance while moving servers to prevent // Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working. // another writer changing our state while we are working.
@ -326,47 +322,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// MovedServers may be < passed in 'servers'. // MovedServers may be < passed in 'servers'.
Set<Address> movedServers = rsGroupInfoManager.moveServers(servers, srcGrp.getName(), Set<Address> movedServers = rsGroupInfoManager.moveServers(servers, srcGrp.getName(),
targetGroupName); targetGroupName);
List<Address> editableMovedServers = Lists.newArrayList(movedServers); moveServerRegionsFromGroup(movedServers, targetGroupName);
boolean foundRegionsToMove; LOG.info("Move servers done: {} => {}", srcGrp.getName(), targetGroupName);
do {
foundRegionsToMove = false;
for (Iterator<Address> iter = editableMovedServers.iterator(); iter.hasNext();) {
Address rs = iter.next();
// Get regions that are associated with this server.
List<RegionInfo> regions = getRegions(rs);
LOG.info("Moving " + regions.size() + " region(s) from " + rs +
" for server move to " + targetGroupName);
for (RegionInfo region: regions) {
// Regions might get assigned from tables of target group so we need to filter
if (targetGrp.containsTable(region.getTable())) {
continue;
}
LOG.info("Moving region " + region.getShortNameToLog());
this.master.getAssignmentManager().move(region);
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
// If region is in FAILED_OPEN state, it won't recover, not without
// operator intervention... in hbase-2.0.0 at least. Continue rather
// than mark region as 'foundRegionsToMove'.
continue;
}
foundRegionsToMove = true;
}
if (!foundRegionsToMove) {
iter.remove();
}
}
try {
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
}
} while (foundRegionsToMove);
LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName);
} }
} }
@ -400,25 +357,14 @@ public class RSGroupAdminServer implements RSGroupAdmin {
"Source RSGroup " + srcGroup + " is same as target " + targetGroup + "Source RSGroup " + srcGroup + " is same as target " + targetGroup +
" RSGroup for table " + table); " RSGroup for table " + table);
} }
LOG.info("Moving table " + table.getNameAsString() + " to RSGroup " + targetGroup); LOG.info("Moving table {} to RSGroup {}", table.getNameAsString(), targetGroup);
} }
rsGroupInfoManager.moveTables(tables, targetGroup); rsGroupInfoManager.moveTables(tables, targetGroup);
// targetGroup is null when a table is being deleted. In this case no further // targetGroup is null when a table is being deleted. In this case no further
// action is required. // action is required.
if (targetGroup != null) { if (targetGroup != null) {
for (TableName table: tables) { moveTableRegionsToGroup(tables, targetGroup);
if (master.getAssignmentManager().isTableDisabled(table)) {
LOG.debug("Skipping move regions because the table" + table + " is disabled.");
continue;
}
for (RegionInfo region :
master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
LOG.info("Moving region " + region.getShortNameToLog() +
" to RSGroup " + targetGroup);
master.getAssignmentManager().move(region);
}
}
} }
} }
} }
@ -478,14 +424,14 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Only allow one balance run at at time. // Only allow one balance run at at time.
Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName); Map<String, RegionState> groupRIT = rsGroupGetRegionsInTransition(groupName);
if (groupRIT.size() > 0) { if (groupRIT.size() > 0) {
LOG.debug("Not running balancer because " + groupRIT.size() + " region(s) in transition: " + LOG.debug("Not running balancer because {} region(s) in transition: {}", groupRIT.size(),
StringUtils.abbreviate( StringUtils.abbreviate(
master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(), master.getAssignmentManager().getRegionStates().getRegionsInTransition().toString(),
256)); 256));
return false; return false;
} }
if (serverManager.areDeadServersInProgress()) { if (serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): " + LOG.debug("Not running balancer because processing dead regionserver(s): {}",
serverManager.getDeadServers()); serverManager.getDeadServers());
return false; return false;
} }
@ -494,10 +440,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
List<RegionPlan> plans = new ArrayList<>(); List<RegionPlan> plans = new ArrayList<>();
for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap: for(Map.Entry<TableName, Map<ServerName, List<RegionInfo>>> tableMap:
getRSGroupAssignmentsByTable(groupName).entrySet()) { getRSGroupAssignmentsByTable(groupName).entrySet()) {
LOG.info("Creating partial plan for table " + tableMap.getKey() + ": " LOG.info("Creating partial plan for table {} : {}", tableMap.getKey(), tableMap.getValue());
+ tableMap.getValue());
List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue()); List<RegionPlan> partialPlans = balancer.balanceCluster(tableMap.getValue());
LOG.info("Partial plan for table " + tableMap.getKey() + ": " + partialPlans); LOG.info("Partial plan for table {} : {}", tableMap.getKey(), partialPlans);
if (partialPlans != null) { if (partialPlans != null) {
plans.addAll(partialPlans); plans.addAll(partialPlans);
} }
@ -505,13 +450,13 @@ public class RSGroupAdminServer implements RSGroupAdmin {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean balancerRan = !plans.isEmpty(); boolean balancerRan = !plans.isEmpty();
if (balancerRan) { if (balancerRan) {
LOG.info("RSGroup balance " + groupName + " starting with plan count: " + plans.size()); LOG.info("RSGroup balance {} starting with plan count: {}", groupName, plans.size());
for (RegionPlan plan: plans) { for (RegionPlan plan: plans) {
LOG.info("balance " + plan); LOG.info("balance {}", plan);
assignmentManager.moveAsync(plan); assignmentManager.moveAsync(plan);
} }
LOG.info("RSGroup balance " + groupName + " completed after " + LOG.info("RSGroup balance {} completed after {} seconds", groupName,
(System.currentTimeMillis()-startTime) + " seconds"); (System.currentTimeMillis() - startTime));
} }
return balancerRan; return balancerRan;
} }
@ -550,13 +495,13 @@ public class RSGroupAdminServer implements RSGroupAdmin {
String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName(); String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName();
rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup); rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup);
//move regions which should not belong to these tables //move regions on these servers which do not belong to group tables
moveRegionsFromServers(servers, tables, targetGroup); moveServerRegionsFromGroup(servers, targetGroup);
//move regions which should belong to these servers //move regions of these tables which are not on group servers
moveRegionsToServers(servers, tables, targetGroup); moveTableRegionsToGroup(tables, targetGroup);
} }
LOG.info("Move servers and tables done. Severs :" LOG.info("Move servers and tables done. Severs: {}, Tables: {} => {}", servers, tables,
+ servers + " , Tables : " + tables + " => " + targetGroup); targetGroup);
} }
@Override @Override
@ -571,7 +516,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
//check the set of servers //check the set of servers
checkForDeadOrOnlineServers(servers); checkForDeadOrOnlineServers(servers);
rsGroupInfoManager.removeServers(servers); rsGroupInfoManager.removeServers(servers);
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done."); LOG.info("Remove decommissioned servers {} from RSGroup done", servers);
} }
} }
} }
@ -621,7 +566,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
result.put(tableName, new HashMap<>()); result.put(tableName, new HashMap<>());
result.get(tableName).putAll(serverMap); result.get(tableName).putAll(serverMap);
result.get(tableName).putAll(assignments.get(tableName)); result.get(tableName).putAll(assignments.get(tableName));
LOG.debug("Adding assignments for " + tableName + ": " + assignments.get(tableName)); LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
} }
} }