Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
42ff3ac22e
commit
5c4a053f2a
|
@ -37,7 +37,9 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -281,21 +283,51 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
*/
|
||||
public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
ExecutorService loadPool = Executors.newFixedThreadPool(1);
|
||||
Future<Boolean> loadTask = loadPool.submit(() -> {
|
||||
Future<Boolean> loadTask = loadPool.submit(getMetaRegionMovePlan());
|
||||
boolean isMetaMoved = waitTaskToFinish(loadPool, loadTask, "loading");
|
||||
if (!isMetaMoved) {
|
||||
return false;
|
||||
}
|
||||
loadPool = Executors.newFixedThreadPool(1);
|
||||
loadTask = loadPool.submit(getNonMetaRegionsMovePlan());
|
||||
return waitTaskToFinish(loadPool, loadTask, "loading");
|
||||
}
|
||||
|
||||
private Callable<Boolean> getMetaRegionMovePlan() {
|
||||
return getRegionsMovePlan(true);
|
||||
}
|
||||
|
||||
private Callable<Boolean> getNonMetaRegionsMovePlan() {
|
||||
return getRegionsMovePlan(false);
|
||||
}
|
||||
|
||||
private Callable<Boolean> getRegionsMovePlan(boolean moveMetaRegion) {
|
||||
return () -> {
|
||||
try {
|
||||
List<RegionInfo> regionsToMove = readRegionsFromFile(filename);
|
||||
if (regionsToMove.isEmpty()) {
|
||||
LOG.info("No regions to load.Exiting");
|
||||
return true;
|
||||
}
|
||||
loadRegions(regionsToMove);
|
||||
Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
|
||||
if (moveMetaRegion) {
|
||||
if (metaRegion.isPresent()) {
|
||||
loadRegions(Collections.singletonList(metaRegion.get()));
|
||||
}
|
||||
} else {
|
||||
metaRegion.ifPresent(regionsToMove::remove);
|
||||
loadRegions(regionsToMove);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error while loading regions to " + hostname, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return waitTaskToFinish(loadPool, loadTask, "loading");
|
||||
};
|
||||
}
|
||||
|
||||
private Optional<RegionInfo> getMetaRegionInfoIfToBeMoved(List<RegionInfo> regionsToMove) {
|
||||
return regionsToMove.stream().filter(RegionInfo::isMetaRegion).findFirst();
|
||||
}
|
||||
|
||||
private void loadRegions(List<RegionInfo> regionsToMove)
|
||||
|
@ -449,30 +481,43 @@ public class RegionMover extends AbstractHBaseTool implements Closeable {
|
|||
}
|
||||
LOG.info("Moving {} regions from {} to {} servers using {} threads .Ack Mode: {}",
|
||||
regionsToMove.size(), this.hostname, regionServers.size(), this.maxthreads, ack);
|
||||
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
List<Future<Boolean>> taskList = new ArrayList<>();
|
||||
int serverIndex = 0;
|
||||
for (RegionInfo regionToMove : regionsToMove) {
|
||||
if (ack) {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
} else {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
}
|
||||
serverIndex = (serverIndex + 1) % regionServers.size();
|
||||
|
||||
Optional<RegionInfo> metaRegion = getMetaRegionInfoIfToBeMoved(regionsToMove);
|
||||
if (metaRegion.isPresent()) {
|
||||
RegionInfo meta = metaRegion.get();
|
||||
submitRegionMovesWhileUnloading(server, regionServers, movedRegions,
|
||||
Collections.singletonList(meta));
|
||||
regionsToMove.remove(meta);
|
||||
}
|
||||
moveRegionsPool.shutdown();
|
||||
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
|
||||
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
||||
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
|
||||
submitRegionMovesWhileUnloading(server, regionServers, movedRegions, regionsToMove);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitRegionMovesWhileUnloading(ServerName server, List<ServerName> regionServers,
|
||||
List<RegionInfo> movedRegions, List<RegionInfo> regionsToMove) throws Exception {
|
||||
final ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
List<Future<Boolean>> taskList = new ArrayList<>();
|
||||
int serverIndex = 0;
|
||||
for (RegionInfo regionToMove : regionsToMove) {
|
||||
if (ack) {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithAck(conn, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
} else {
|
||||
Future<Boolean> task = moveRegionsPool.submit(
|
||||
new MoveWithoutAck(admin, regionToMove, server, regionServers.get(serverIndex),
|
||||
movedRegions));
|
||||
taskList.add(task);
|
||||
}
|
||||
serverIndex = (serverIndex + 1) % regionServers.size();
|
||||
}
|
||||
moveRegionsPool.shutdown();
|
||||
long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration()
|
||||
.getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
||||
waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds);
|
||||
}
|
||||
|
||||
private boolean waitTaskToFinish(ExecutorService pool, Future<Boolean> task, String operation)
|
||||
throws TimeoutException, InterruptedException, ExecutionException {
|
||||
pool.shutdown();
|
||||
|
|
Loading…
Reference in New Issue