HBASE-26371 Prioritize meta region move over other region moves in region_mover (#3767)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Viraj Jasani 2021-10-20 23:05:12 +05:30 committed by GitHub
parent 9e73ea878d
commit 856d08e863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 24 deletions

View File

@ -38,7 +38,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -284,21 +286,51 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
*/
public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService loadPool = Executors.newFixedThreadPool(1);
Future<Boolean> loadTask = loadPool.submit(() -> {
Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
if (!isMetaMoved) {
return false;
}
loadPool = Executors.newFixedThreadPool(1);
loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
return waitTaskToFinish(loadPool, loadTask, "loading");
}
private Callable<Boolean> getMetaRegionMovePlan() {
return getRegionsMovePlan(true);
}
private Callable<Boolean> getNonMetaRegionsMovePlan() {
return getRegionsMovePlan(false);
}
private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
return () -> {
try {
List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
if (regionsToMove.isEmpty()) {
LOG.info("No regions to load.Exiting");
return true;
}
loadRegions(regionsToMove);
Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
if (moveMetaRegion) {
if (metaRegion.isPresent()) {
loadRegions(Collections.singletonList(metaRegion.get()));
}
} else {
metaRegion.ifPresent(regionsToMove::remove);
loadRegions(regionsToMove);
}
} catch (Exception e) {
LOG.error("Error while loading regions to " + hostname, e);
return false;
}
return true;
});
return waitTaskToFinish(loadPool, loadTask, "loading");
};
}
private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
}
private void loadRegions(List<RegionInfo> regionsToMove)
@ -472,30 +504,43 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
}
LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
List<Future<Boolean>> taskList = new ArrayList<>();
int serverIndex = 0;
for (RegionInfo regionToMove : regionsToMove) {
if (ack) {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
} else {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
}
serverIndex = (serverIndex + 1) % regionServers.size();
Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
if (metaRegion.isPresent()) {
RegionInfo meta = metaRegion.get();
submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
Collections.singletonList(meta));
regionsToMove.remove(meta);
}
moveRegionsPool.shutdown();
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
}
}
private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
List<Future<Boolean>> taskList = new ArrayList<>();
int serverIndex = 0;
for (RegionInfo regionToMove : regionsToMove) {
if (ack) {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
} else {
Future<Boolean> task = moveRegionsPool.submit(
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
movedRegions));
taskList.add(task);
}
serverIndex = (serverIndex + 1) % regionServers.size();
}
moveRegionsPool.shutdown();
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
}
private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
throws TimeoutException, InterruptedException, ExecutionException {
pool.shutdown();