HBASE-24025: Improve performance of move_servers_rsgroup by using async region move API (#1549)
This commit is contained in:
parent
8a995ae81b
commit
bdcafa895c
|
@ -33,6 +33,7 @@ import java.util.OptionalLong;
|
|||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
|||
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -956,84 +958,129 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
/**
|
||||
* Move every region from servers which are currently located on these servers, but should not be
|
||||
* located there.
|
||||
* @param servers the servers that will move to new group
|
||||
* @param targetGroupName the target group name
|
||||
* @param movedServers the servers that are moved to new group
|
||||
* @param srcGrpServers all servers in the source group, excluding the movedServers
|
||||
* @param targetGroup the target group
|
||||
* @throws IOException if moving the server and tables fail
|
||||
*/
|
||||
private void moveServerRegionsFromGroup(Set<Address> servers, String targetGroupName)
|
||||
throws IOException {
|
||||
moveRegionsBetweenGroups(servers, targetGroupName, rs -> getRegions(rs), info -> {
|
||||
private void moveServerRegionsFromGroup(Set<Address> movedServers, Set<Address> srcGrpServers,
|
||||
RSGroupInfo targetGroup) throws IOException {
|
||||
moveRegionsBetweenGroups(movedServers, srcGrpServers, targetGroup, rs -> getRegions(rs),
|
||||
info -> {
|
||||
try {
|
||||
String groupName = RSGroupUtil.getRSGroupInfo(masterServices, this, info.getTable())
|
||||
.map(RSGroupInfo::getName).orElse(RSGroupInfo.DEFAULT_GROUP);
|
||||
return groupName.equals(targetGroupName);
|
||||
return groupName.equals(targetGroup.getName());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to test group for region {} and target group {}", info, targetGroupName);
|
||||
LOG.warn("Failed to test group for region {} and target group {}", info,
|
||||
targetGroup.getName());
|
||||
return false;
|
||||
}
|
||||
}, rs -> rs.getHostname());
|
||||
});
|
||||
}
|
||||
|
||||
private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, String targetGroupName,
|
||||
Function<T, List<RegionInfo>> getRegionsInfo, Function<RegionInfo, Boolean> validation,
|
||||
Function<T, String> getOwnerName) throws IOException {
|
||||
boolean hasRegionsToMove;
|
||||
private <T> void moveRegionsBetweenGroups(Set<T> regionsOwners, Set<Address> newRegionsOwners,
|
||||
RSGroupInfo targetGrp, Function<T, List<RegionInfo>> getRegionsInfo,
|
||||
Function<RegionInfo, Boolean> validation) throws IOException {
|
||||
// Get server names corresponding to given Addresses
|
||||
List<ServerName> movedServerNames = new ArrayList<>(regionsOwners.size());
|
||||
List<ServerName> srcGrpServerNames = new ArrayList<>(newRegionsOwners.size());
|
||||
for (ServerName serverName : masterServices.getServerManager().getOnlineServers().keySet()) {
|
||||
// In case region move failed in previous attempt, regionsOwners and newRegionsOwners
|
||||
// can have the same servers. So for all servers below both conditions to be checked
|
||||
if (newRegionsOwners.contains(serverName.getAddress())) {
|
||||
srcGrpServerNames.add(serverName);
|
||||
}
|
||||
if (regionsOwners.contains(serverName.getAddress())) {
|
||||
movedServerNames.add(serverName);
|
||||
}
|
||||
}
|
||||
List<Pair<RegionInfo, Future<byte[]>>> assignmentFutures = new ArrayList<>();
|
||||
int retry = 0;
|
||||
Set<T> allOwners = new HashSet<>(regionsOwners);
|
||||
Set<String> failedRegions = new HashSet<>();
|
||||
IOException toThrow = null;
|
||||
do {
|
||||
hasRegionsToMove = false;
|
||||
for (Iterator<T> iter = allOwners.iterator(); iter.hasNext(); ) {
|
||||
T owner = iter.next();
|
||||
assignmentFutures.clear();
|
||||
failedRegions.clear();
|
||||
for (ServerName owner : movedServerNames) {
|
||||
// Get regions that are associated with this server and filter regions by group tables.
|
||||
for (RegionInfo region : getRegionsInfo.apply(owner)) {
|
||||
for (RegionInfo region : getRegionsInfo.apply((T) owner.getAddress())) {
|
||||
if (!validation.apply(region)) {
|
||||
LOG.info("Moving region {}, which do not belong to RSGroup {}",
|
||||
region.getShortNameToLog(), targetGroupName);
|
||||
region.getShortNameToLog(), targetGrp.getName());
|
||||
// Move region back to source RSGroup servers
|
||||
ServerName dest =
|
||||
masterServices.getLoadBalancer().randomAssignment(region, srcGrpServerNames);
|
||||
if (dest == null) {
|
||||
failedRegions.add(region.getRegionNameAsString());
|
||||
continue;
|
||||
}
|
||||
RegionPlan rp = new RegionPlan(region, owner, dest);
|
||||
try {
|
||||
this.masterServices.getAssignmentManager().move(region);
|
||||
failedRegions.remove(region.getRegionNameAsString());
|
||||
Future<byte[]> future = masterServices.getAssignmentManager().moveAsync(rp);
|
||||
assignmentFutures.add(Pair.newPair(region, future));
|
||||
} catch (IOException ioe) {
|
||||
failedRegions.add(region.getRegionNameAsString());
|
||||
LOG.debug("Move region {} from group failed, will retry, current retry time is {}",
|
||||
region.getShortNameToLog(), retry, ioe);
|
||||
toThrow = ioe;
|
||||
failedRegions.add(region.getRegionNameAsString());
|
||||
}
|
||||
if (masterServices.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
continue;
|
||||
}
|
||||
hasRegionsToMove = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasRegionsToMove) {
|
||||
LOG.info("No more regions to move from {} to RSGroup", getOwnerName.apply(owner));
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
retry++;
|
||||
waitForRegionMovement(assignmentFutures, failedRegions, targetGrp.getName(), retry);
|
||||
if (failedRegions.isEmpty()) {
|
||||
LOG.info("All regions from server(s) {} moved to target group {}.", movedServerNames,
|
||||
targetGrp.getName());
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Sleep interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} while (hasRegionsToMove && retry <=
|
||||
masterServices.getConfiguration().getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE));
|
||||
retry++;
|
||||
}
|
||||
} while (!failedRegions.isEmpty() && retry <= masterServices.getConfiguration()
|
||||
.getInt(FAILED_MOVE_MAX_RETRY, DEFAULT_MAX_RETRY_VALUE));
|
||||
|
||||
//has up to max retry time or there are no more regions to move
|
||||
if (hasRegionsToMove) {
|
||||
if (!failedRegions.isEmpty()) {
|
||||
// print failed moved regions, for later process conveniently
|
||||
String msg = String
|
||||
.format("move regions for group %s failed, failed regions: %s", targetGroupName,
|
||||
.format("move regions for group %s failed, failed regions: %s", targetGrp.getName(),
|
||||
failedRegions);
|
||||
LOG.error(msg);
|
||||
throw new DoNotRetryIOException(
|
||||
msg + ", just record the last failed region's cause, more details in server log",
|
||||
toThrow);
|
||||
msg + ", just record the last failed region's cause, more details in server log", toThrow);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all the region move to complete. Keep waiting for other region movement
|
||||
* completion even if some region movement fails.
|
||||
*/
|
||||
private void waitForRegionMovement(List<Pair<RegionInfo, Future<byte[]>>> regionMoveFutures,
|
||||
Set<String> failedRegions, String tgtGrpName, int retryCount) {
|
||||
LOG.info("Moving {} region(s) to group {}, current retry={}", regionMoveFutures.size(),
|
||||
tgtGrpName, retryCount);
|
||||
for (Pair<RegionInfo, Future<byte[]>> pair : regionMoveFutures) {
|
||||
try {
|
||||
pair.getSecond().get();
|
||||
if (masterServices.getAssignmentManager().getRegionStates().
|
||||
getRegionState(pair.getFirst()).isFailedOpen()) {
|
||||
failedRegions.add(pair.getFirst().getRegionNameAsString());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
//Dont return form there lets wait for other regions to complete movement.
|
||||
failedRegions.add(pair.getFirst().getRegionNameAsString());
|
||||
LOG.warn("Sleep interrupted", e);
|
||||
} catch (Exception e) {
|
||||
failedRegions.add(pair.getFirst().getRegionNameAsString());
|
||||
LOG.error("Move region {} to group {} failed, will retry on next attempt",
|
||||
pair.getFirst().getShortNameToLog(), tgtGrpName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1185,7 +1232,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
if (StringUtils.isEmpty(targetGroupName)) {
|
||||
throw new ConstraintException("RSGroup cannot be null.");
|
||||
}
|
||||
getRSGroupInfo(targetGroupName);
|
||||
RSGroupInfo targetGroup = getRSGroupInfo(targetGroupName);
|
||||
|
||||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
|
@ -1230,7 +1277,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
// MovedServers may be < passed in 'servers'.
|
||||
Set<Address> movedServers = moveServers(servers, srcGrp.getName(),
|
||||
targetGroupName);
|
||||
moveServerRegionsFromGroup(movedServers, targetGroupName);
|
||||
moveServerRegionsFromGroup(movedServers, srcGrp.getServers(), targetGroup);
|
||||
LOG.info("Move servers done: {} => {}", srcGrp.getName(), targetGroupName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -686,4 +686,39 @@ public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
|
|||
assertEquals(regionsInfo.getTable(), table2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveServersToRSGroupPerformance() throws Exception {
|
||||
final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 2);
|
||||
final byte[] familyNameBytes = Bytes.toBytes("f");
|
||||
// there will be 100 regions are both the serves
|
||||
final int tableRegionCount = 200;
|
||||
// All the regions created below will be assigned to the default group.
|
||||
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
List<String> regions = getTableRegionMap().get(tableName);
|
||||
if (regions == null) {
|
||||
return false;
|
||||
}
|
||||
return getTableRegionMap().get(tableName).size() >= tableRegionCount;
|
||||
}
|
||||
});
|
||||
ADMIN.setRSGroup(Sets.newHashSet(tableName), newGroup.getName());
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
String rsGroup2 = "rsGroup2";
|
||||
ADMIN.addRSGroup(rsGroup2);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
ADMIN.moveServersToRSGroup(Sets.newHashSet(newGroup.getServers().first()), rsGroup2);
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
String msg =
|
||||
"Should not take mote than 15000 ms to move a table with 100 regions. Time taken ="
|
||||
+ timeTaken + " ms";
|
||||
//This test case is meant to be used for verifying the performance quickly by a developer.
|
||||
//Moving 100 regions takes much less than 15000 ms. Given 15000 ms so test cases passes
|
||||
// on all environment.
|
||||
assertTrue(msg, timeTaken < 15000);
|
||||
LOG.info("Time taken to move a table with 100 region is {} ms", timeTaken);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue