From 91aceeb28de2c9f2d118db9c834f7c99d393128f Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Mon, 8 Oct 2018 10:44:56 +0800 Subject: [PATCH] HBASE-21251 Refactor RegionMover --- .../apache/hadoop/hbase/util/RegionMover.java | 914 +++++++----------- 1 file changed, 346 insertions(+), 568 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java index 54c71a10450..24c8fc339b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java @@ -19,14 +19,17 @@ package org.apache.hadoop.hbase.util; -import java.io.BufferedReader; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FileReader; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -41,24 +44,23 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +78,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; * {@link #load()} or {@link #unload()} methods for the desired operations. */ @InterfaceAudience.Public -public class RegionMover extends AbstractHBaseTool { +public class RegionMover extends AbstractHBaseTool implements Closeable { public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; @@ -93,8 +95,10 @@ public class RegionMover extends AbstractHBaseTool { private String filename; private String excludeFile; private int port; + private Connection conn; + private Admin admin; - private RegionMover(RegionMoverBuilder builder) { + private RegionMover(RegionMoverBuilder builder) throws IOException { this.hostname = builder.hostname; this.filename = builder.filename; this.excludeFile = builder.excludeFile; @@ -103,11 +107,19 @@ public class RegionMover extends AbstractHBaseTool { this.port = builder.port; this.timeout = builder.timeout; setConf(builder.conf); + this.conn = ConnectionFactory.createConnection(conf); + this.admin = conn.getAdmin(); } private RegionMover() { } + @Override + public void close() { + IOUtils.closeQuietly(this.admin); + IOUtils.closeQuietly(this.conn); + } + /** * Builder for Region mover. Use the {@link #build()} method to create RegionMover object. Has * {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, @@ -121,7 +133,6 @@ public class RegionMover extends AbstractHBaseTool { private String filename; private String excludeFile = null; private String defaultDir = System.getProperty("java.io.tmpdir"); - @VisibleForTesting final int port; private final Configuration conf; @@ -220,75 +231,175 @@ public class RegionMover extends AbstractHBaseTool { * using load and unload methods * @return RegionMover object */ - public RegionMover build() { + public RegionMover build() throws IOException { return new RegionMover(this); } } + /** + * Move Regions and make sure that they are up on the target server.If a region movement fails we + * exit as failure + */ + private class MoveWithAck implements Callable { + private RegionInfo region; + private ServerName targetServer; + private List movedRegions; + private ServerName sourceServer; + + public MoveWithAck(RegionInfo regionInfo, ServerName sourceServer, + ServerName targetServer, List movedRegions) { + this.region = regionInfo; + this.targetServer = targetServer; + this.movedRegions = movedRegions; + this.sourceServer = sourceServer; + } + + @Override + public Boolean call() throws IOException, InterruptedException { + boolean moved = false; + int count = 0; + int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); + int maxWaitInSeconds = + admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + long startTime = EnvironmentEdgeManager.currentTime(); + boolean sameServer = true; + // Assert we can scan the region in its current location + isSuccessfulScan(region); + LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " + + targetServer); + while (count < retries && sameServer) { + if (count > 0) { + LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); + } + count = count + 1; + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName())); + long maxWait = startTime + (maxWaitInSeconds * 1000); + while (EnvironmentEdgeManager.currentTime() < maxWait) { + sameServer = isSameServer(region, sourceServer); + if (!sameServer) { + break; + } + Thread.sleep(100); + } + } + if (sameServer) { + LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer + + ",newServer=" + this.targetServer); + } else { + isSuccessfulScan(region); + LOG.info("Moved Region " + + region.getRegionNameAsString() + + " cost:" + + String.format("%.3f", + (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); + moved = true; + movedRegions.add(region); + } + return moved; + } + } + + /** + * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the + * RS down anyways and not abort on a stuck region. Improves movement performance + */ + private class MoveWithoutAck implements Callable { + private RegionInfo region; + private ServerName targetServer; + private List movedRegions; + private ServerName sourceServer; + + public MoveWithoutAck(RegionInfo regionInfo, ServerName sourceServer, + ServerName targetServer, List movedRegions) { + this.region = regionInfo; + this.targetServer = targetServer; + this.movedRegions = movedRegions; + this.sourceServer = sourceServer; + } + + @Override + public Boolean call() { + try { + LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " + + targetServer); + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer.getServerName())); + LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " + + targetServer); + } catch (Exception e) { + LOG.error("Error Moving Region:" + region.getEncodedName(), e); + } finally { + // we add region to the moved regions list in No Ack Mode since this is best effort + movedRegions.add(region); + } + return true; + } + } + /** * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} * @return true if loading succeeded, false otherwise - * @throws ExecutionException - * @throws InterruptedException if the loader thread was interrupted - * @throws TimeoutException */ public boolean load() throws ExecutionException, InterruptedException, TimeoutException { ExecutorService loadPool = Executors.newFixedThreadPool(1); - Future loadTask = loadPool.submit(new Load(this)); - loadPool.shutdown(); - try { - if (!loadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { - LOG.warn("Timed out before finishing the loading operation. Timeout:" + this.timeout - + "sec"); - loadPool.shutdownNow(); - } - } catch (InterruptedException e) { - loadPool.shutdownNow(); - Thread.currentThread().interrupt(); - } - try { - return loadTask.get(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.warn("Interrupted while loading Regions on " + this.hostname, e); - throw e; - } catch (ExecutionException e) { - LOG.error("Error while loading regions on RegionServer " + this.hostname, e); - throw e; - } - } - - private class Load implements Callable { - - private RegionMover rm; - - public Load(RegionMover rm) { - this.rm = rm; - } - - @Override - public Boolean call() throws IOException { - Connection conn = ConnectionFactory.createConnection(rm.conf); + Future loadTask = loadPool.submit(() -> { try { - List regionsToMove = readRegionsFromFile(rm.filename); + List regionsToMove = readRegionsFromFile(filename); if (regionsToMove.isEmpty()) { LOG.info("No regions to load.Exiting"); return true; } - Admin admin = conn.getAdmin(); - try { - loadRegions(admin, rm.hostname, rm.port, regionsToMove, rm.ack); - } finally { - admin.close(); - } + loadRegions(regionsToMove); } catch (Exception e) { - LOG.error("Error while loading regions to " + rm.hostname, e); + LOG.error("Error while loading regions to " + hostname, e); return false; - } finally { - conn.close(); } return true; + }); + return waitTaskToFinish(loadPool, loadTask, "loading"); + } + + private void loadRegions(List regionsToMove) + throws Exception { + ServerName server = getTargetServer(); + List movedRegions = Collections.synchronizedList(new ArrayList<>()); + LOG.info( + "Moving " + regionsToMove.size() + " regions to " + server + " using " + this.maxthreads + + " threads.Ack mode:" + this.ack); + + ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); + List> taskList = new ArrayList<>(); + int counter = 0; + while (counter < regionsToMove.size()) { + RegionInfo region = regionsToMove.get(counter); + ServerName currentServer = getServerNameForRegion(region); + if (currentServer == null) { + LOG.warn( + "Could not get server for Region:" + region.getRegionNameAsString() + " moving on"); + counter++; + continue; + } else if (server.equals(currentServer)) { + LOG.info( + "Region " + region.getRegionNameAsString() + " is already on target server=" + server); + counter++; + continue; + } + if (ack) { + Future task = + moveRegionsPool.submit(new MoveWithAck(region, currentServer, server, movedRegions)); + taskList.add(task); + } else { + Future task = + moveRegionsPool.submit(new MoveWithoutAck(region, currentServer, server, movedRegions)); + taskList.add(task); + } + counter++; } + + moveRegionsPool.shutdown(); + long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() + .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); } /** @@ -297,144 +408,106 @@ public class RegionMover extends AbstractHBaseTool { * server,hence it is best effort.We do not unload regions to hostnames given in * {@link #excludeFile}. * @return true if unloading succeeded, false otherwise - * @throws InterruptedException if the unloader thread was interrupted - * @throws ExecutionException - * @throws TimeoutException */ public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { deleteFile(this.filename); ExecutorService unloadPool = Executors.newFixedThreadPool(1); - Future unloadTask = unloadPool.submit(new Unload(this)); - unloadPool.shutdown(); - try { - if (!unloadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { - LOG.warn("Timed out before finishing the unloading operation. Timeout:" + this.timeout - + "sec"); - unloadPool.shutdownNow(); - } - } catch (InterruptedException e) { - unloadPool.shutdownNow(); - Thread.currentThread().interrupt(); - } - try { - return unloadTask.get(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.warn("Interrupted while unloading Regions from " + this.hostname, e); - throw e; - } catch (ExecutionException e) { - LOG.error("Error while unloading regions from RegionServer " + this.hostname, e); - throw e; - } - } - - private class Unload implements Callable { - - List movedRegions = Collections.synchronizedList(new ArrayList()); - private RegionMover rm; - - public Unload(RegionMover rm) { - this.rm = rm; - } - - @Override - public Boolean call() throws IOException { - Connection conn = ConnectionFactory.createConnection(rm.conf); + Future unloadTask = unloadPool.submit(() -> { + List movedRegions = Collections.synchronizedList(new ArrayList<>()); try { - Admin admin = conn.getAdmin(); // Get Online RegionServers - ArrayList regionServers = getServers(admin); - if (LOG.isDebugEnabled()) { - LOG.debug("Online region servers:" + regionServers.toString()); - } + List regionServers = new ArrayList<>(); + regionServers.addAll( + admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics() + .keySet()); // Remove the host Region server from target Region Servers list - String server = stripServer(regionServers, hostname, port); + ServerName server = stripServer(regionServers, hostname, port); // Remove RS present in the exclude file - stripExcludes(regionServers, rm.excludeFile); - stripMaster(regionServers, admin); - unloadRegions(admin, server, regionServers, rm.ack, movedRegions); + stripExcludes(regionServers); + stripMaster(regionServers); + if (regionServers.isEmpty()) { + LOG.warn("No Regions were moved - no servers available"); + return false; + } + unloadRegions(server, regionServers, movedRegions); } catch (Exception e) { LOG.error("Error while unloading regions ", e); return false; } finally { - try { - conn.close(); - } catch (IOException e) { - // ignore - } if (movedRegions != null) { - writeFile(rm.filename, movedRegions); + writeFile(filename, movedRegions); } } return true; + }); + return waitTaskToFinish(unloadPool, unloadTask, "unloading"); + } + + private void unloadRegions(ServerName server, List regionServers, + List movedRegions) throws Exception { + while (true) { + List regionsToMove = admin.getRegions(server); + regionsToMove.removeAll(movedRegions); + if (regionsToMove.isEmpty()) { + LOG.info("No Regions to move....Quitting now"); + break; + } + int counter = 0; + LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " + + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" + + ack); + ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); + List> taskList = new ArrayList<>(); + int serverIndex = 0; + while (counter < regionsToMove.size()) { + if (ack) { + Future task = moveRegionsPool.submit( + new MoveWithAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), + movedRegions)); + taskList.add(task); + } else { + Future task = moveRegionsPool.submit( + new MoveWithoutAck(regionsToMove.get(counter), server, regionServers.get(serverIndex), + movedRegions)); + taskList.add(task); + } + counter++; + serverIndex = (serverIndex + 1) % regionServers.size(); + } + moveRegionsPool.shutdown(); + long timeoutInSeconds = regionsToMove.size() * admin.getConfiguration() + .getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + waitMoveTasksToFinish(moveRegionsPool, taskList, timeoutInSeconds); } } - private void loadRegions(Admin admin, String hostname, int port, - List regionsToMove, boolean ack) throws Exception { - String server = null; - List movedRegions = Collections.synchronizedList(new ArrayList()); - int maxWaitInSeconds = - admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); - long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; - while ((EnvironmentEdgeManager.currentTime() < maxWait) && (server == null)) { - try { - ArrayList regionServers = getServers(admin); - // Remove the host Region server from target Region Servers list - server = stripServer(regionServers, hostname, port); - if (server != null) { - break; - } - } catch (IOException e) { - LOG.warn("Could not get list of region servers", e); - } catch (Exception e) { - LOG.info("hostname=" + hostname + " is not up yet, waiting"); - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for " + hostname + " to be up.Quitting now", e); - throw e; + private boolean waitTaskToFinish(ExecutorService pool, Future task, String operation) + throws TimeoutException, InterruptedException, ExecutionException { + pool.shutdown(); + try { + if (!pool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { + LOG.warn( + "Timed out before finishing the " + operation + " operation. Timeout: " + this.timeout + + "sec"); + pool.shutdownNow(); } + } catch (InterruptedException e) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); } - if (server == null) { - LOG.error("Host:" + hostname + " is not up.Giving up."); - throw new Exception("Host to load regions not online"); + try { + return task.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Interrupted while " + operation + " Regions on " + this.hostname, e); + throw e; + } catch (ExecutionException e) { + LOG.error("Error while " + operation + " regions on RegionServer " + this.hostname, e); + throw e; } - LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using " - + this.maxthreads + " threads.Ack mode:" + this.ack); - ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); - List> taskList = new ArrayList<>(); - int counter = 0; - while (counter < regionsToMove.size()) { - RegionInfo region = regionsToMove.get(counter); - String currentServer = getServerNameForRegion(admin, region); - if (currentServer == null) { - LOG.warn("Could not get server for Region:" + region.getEncodedName() + " moving on"); - counter++; - continue; - } else if (server.equals(currentServer)) { - LOG.info("Region " + region.getRegionNameAsString() + - " is already on target server=" + server); - counter++; - continue; - } - if (ack) { - Future task = - moveRegionsPool.submit(new MoveWithAck(admin, region, currentServer, server, - movedRegions)); - taskList.add(task); - } else { - Future task = - moveRegionsPool.submit(new MoveWithoutAck(admin, region, currentServer, server, - movedRegions)); - taskList.add(task); - } - counter++; - } - moveRegionsPool.shutdown(); - long timeoutInSeconds = - regionsToMove.size() - * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + } + + private void waitMoveTasksToFinish(ExecutorService moveRegionsPool, + List> taskList, long timeoutInSeconds) throws Exception { try { if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { moveRegionsPool.shutdownNow(); @@ -457,189 +530,41 @@ public class RegionMover extends AbstractHBaseTool { LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); throw e; } catch (CancellationException e) { - LOG.error("Thread for moving region cancelled. Timeout for cancellation:" - + timeoutInSeconds + "secs", e); + LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + timeoutInSeconds + + "secs", e); throw e; } } } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE", - justification="FB is wrong; its size is read") - private void unloadRegions(Admin admin, String server, ArrayList regionServers, - boolean ack, List movedRegions) throws Exception { - List regionsToMove = new ArrayList<>();// FindBugs: DLS_DEAD_LOCAL_STORE - regionsToMove = getRegions(this.conf, server); - if (regionsToMove.isEmpty()) { - LOG.info("No Regions to move....Quitting now"); - return; - } else if (regionServers.isEmpty()) { - LOG.warn("No Regions were moved - no servers available"); - throw new Exception("No online region servers"); - } - while (true) { - regionsToMove = getRegions(this.conf, server); - regionsToMove.removeAll(movedRegions); - if (regionsToMove.isEmpty()) { - break; - } - int counter = 0; - LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " - + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" - + ack); - ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); - List> taskList = new ArrayList<>(); - int serverIndex = 0; - while (counter < regionsToMove.size()) { - if (ack) { - Future task = - moveRegionsPool.submit(new MoveWithAck(admin, regionsToMove.get(counter), server, - regionServers.get(serverIndex), movedRegions)); - taskList.add(task); + private ServerName getTargetServer() throws Exception { + ServerName server = null; + int maxWaitInSeconds = + admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); + long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; + while (EnvironmentEdgeManager.currentTime() < maxWait) { + try { + List regionServers = new ArrayList<>(); + regionServers.addAll( + admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics() + .keySet()); + // Remove the host Region server from target Region Servers list + server = stripServer(regionServers, hostname, port); + if (server != null) { + break; } else { - Future task = - moveRegionsPool.submit(new MoveWithoutAck(admin, regionsToMove.get(counter), server, - regionServers.get(serverIndex), movedRegions)); - taskList.add(task); - } - counter++; - serverIndex = (serverIndex + 1) % regionServers.size(); - } - moveRegionsPool.shutdown(); - long timeoutInSeconds = - regionsToMove.size() - * admin.getConfiguration().getLong(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); - try { - if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { - moveRegionsPool.shutdownNow(); - } - } catch (InterruptedException e) { - moveRegionsPool.shutdownNow(); - Thread.currentThread().interrupt(); - } - for (Future future : taskList) { - try { - // if even after shutdownNow threads are stuck we wait for 5 secs max - if (!future.get(5, TimeUnit.SECONDS)) { - LOG.error("Was Not able to move region....Exiting Now"); - throw new Exception("Could not move region Exception"); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); - throw e; - } catch (ExecutionException e) { - LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); - throw e; - } catch (CancellationException e) { - LOG.error("Thread for moving region cancelled. Timeout for cancellation:" - + timeoutInSeconds + "secs", e); - throw e; + LOG.warn("Server " + hostname + ":" + port + " is not up yet, waiting"); } + } catch (IOException e) { + LOG.warn("Could not get list of region servers", e); } + Thread.sleep(500); } - } - - /** - * Move Regions and make sure that they are up on the target server.If a region movement fails we - * exit as failure - */ - private class MoveWithAck implements Callable { - private Admin admin; - private RegionInfo region; - private String targetServer; - private List movedRegions; - private String sourceServer; - - public MoveWithAck(Admin admin, RegionInfo regionInfo, String sourceServer, - String targetServer, List movedRegions) { - this.admin = admin; - this.region = regionInfo; - this.targetServer = targetServer; - this.movedRegions = movedRegions; - this.sourceServer = sourceServer; - } - - @Override - public Boolean call() throws IOException, InterruptedException { - boolean moved = false; - int count = 0; - int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); - int maxWaitInSeconds = - admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); - long startTime = EnvironmentEdgeManager.currentTime(); - boolean sameServer = true; - // Assert we can scan the region in its current location - isSuccessfulScan(admin, region); - LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " - + targetServer); - while (count < retries && sameServer) { - if (count > 0) { - LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); - } - count = count + 1; - admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); - long maxWait = startTime + (maxWaitInSeconds * 1000); - while (EnvironmentEdgeManager.currentTime() < maxWait) { - sameServer = isSameServer(admin, region, sourceServer); - if (!sameServer) { - break; - } - Thread.sleep(100); - } - } - if (sameServer) { - LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer - + ",newServer=" + this.targetServer); - } else { - isSuccessfulScan(admin, region); - LOG.info("Moved Region " - + region.getRegionNameAsString() - + " cost:" - + String.format("%.3f", - (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); - moved = true; - movedRegions.add(region); - } - return moved; - } - } - - /** - * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the - * RS down anyways and not abort on a stuck region. Improves movement performance - */ - private static class MoveWithoutAck implements Callable { - private Admin admin; - private RegionInfo region; - private String targetServer; - private List movedRegions; - private String sourceServer; - - public MoveWithoutAck(Admin admin, RegionInfo regionInfo, String sourceServer, - String targetServer, List movedRegions) { - this.admin = admin; - this.region = regionInfo; - this.targetServer = targetServer; - this.movedRegions = movedRegions; - this.sourceServer = sourceServer; - } - - @Override - public Boolean call() { - try { - LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to " - + targetServer); - admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); - LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " - + targetServer); - } catch (Exception e) { - LOG.error("Error Moving Region:" + region.getEncodedName(), e); - } finally { - // we add region to the moved regions list in No Ack Mode since this is best effort - movedRegions.add(region); - } - return true; + if (server == null) { + LOG.error("Server " + hostname + ":" + port + " is not up. Giving up."); + throw new Exception("Server " + hostname + ":" + port + " to load regions not online"); } + return server; } private List readRegionsFromFile(String filename) throws IOException { @@ -648,11 +573,8 @@ public class RegionMover extends AbstractHBaseTool { if (!f.exists()) { return regions; } - FileInputStream fis = null; - DataInputStream dis = null; - try { - fis = new FileInputStream(f); - dis = new DataInputStream(fis); + try (DataInputStream dis = new DataInputStream( + new BufferedInputStream(new FileInputStream(f)))) { int numRegions = dis.readInt(); int index = 0; while (index < numRegions) { @@ -662,80 +584,66 @@ public class RegionMover extends AbstractHBaseTool { } catch (IOException e) { LOG.error("Error while reading regions from file:" + filename, e); throw e; - } finally { - if (dis != null) { - dis.close(); - } - if (fis != null) { - fis.close(); - } } return regions; } - /** - * Get online regions of the passed server - * @param conf - * @param server - * @return List of Regions online on the server - * @throws IOException - */ - private List getRegions(Configuration conf, String server) throws IOException { - Connection conn = ConnectionFactory.createConnection(conf); - try { - return conn.getAdmin().getRegions(ServerName.valueOf(server)); - } finally { - conn.close(); - } - } - /** * Write the number of regions moved in the first line followed by regions moved in subsequent * lines - * @param filename - * @param movedRegions - * @throws IOException */ private void writeFile(String filename, List movedRegions) throws IOException { - FileOutputStream fos = null; - DataOutputStream dos = null; - try { - fos = new FileOutputStream(filename); - dos = new DataOutputStream(fos); + try (DataOutputStream dos = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(filename)))) { dos.writeInt(movedRegions.size()); for (RegionInfo region : movedRegions) { Bytes.writeByteArray(dos, RegionInfo.toByteArray(region)); } } catch (IOException e) { - LOG.error("ERROR: Was Not able to write regions moved to output file but moved " - + movedRegions.size() + " regions", e); + LOG.error( + "ERROR: Was Not able to write regions moved to output file but moved " + movedRegions + .size() + " regions", e); throw e; - } finally { - if (dos != null) { - dos.close(); - } - if (fos != null) { - fos.close(); + } + } + + private void deleteFile(String filename) { + File f = new File(filename); + if (f.exists()) { + f.delete(); + } + } + + /** + * @return List of servers from the exclude file in format 'hostname:port'. + */ + private List readExcludes(String excludeFile) throws IOException { + List excludeServers = new ArrayList<>(); + if (excludeFile == null) { + return excludeServers; + } else { + try { + Files.readAllLines(Paths.get(excludeFile)).stream().map(String::trim) + .filter(((Predicate) String::isEmpty).negate()).map(String::toLowerCase) + .forEach(excludeServers::add); + } catch (IOException e) { + LOG.warn("Exception while reading excludes file, continuing anyways", e); } + return excludeServers; } } /** * Excludes the servername whose hostname and port portion matches the list given in exclude file - * @param regionServers - * @param excludeFile - * @throws IOException */ - private void stripExcludes(ArrayList regionServers, String excludeFile) - throws IOException { + private void stripExcludes(List regionServers) throws IOException { if (excludeFile != null) { - ArrayList excludes = readExcludes(excludeFile); - Iterator i = regionServers.iterator(); + List excludes = readExcludes(excludeFile); + Iterator i = regionServers.iterator(); while (i.hasNext()) { - String rs = i.next(); - String rsPort = - rs.split(ServerName.SERVERNAME_SEPARATOR)[0] + ":" - + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; + String rs = i.next().getServerName(); + String rsPort = rs.split(ServerName.SERVERNAME_SEPARATOR)[0].toLowerCase() + ":" + rs + .split(ServerName.SERVERNAME_SEPARATOR)[1]; if (excludes.contains(rsPort)) { i.remove(); } @@ -747,127 +655,42 @@ public class RegionMover extends AbstractHBaseTool { /** * Exclude master from list of RSs to move regions to - * @param regionServers - * @param admin - * @throws IOException */ - private void stripMaster(ArrayList regionServers, Admin admin) throws IOException { + private void stripMaster(List regionServers) throws IOException { ServerName master = admin.getClusterMetrics(EnumSet.of(Option.MASTER)).getMasterName(); - String masterHostname = master.getHostname(); - int masterPort = master.getPort(); - try { - stripServer(regionServers, masterHostname, masterPort); - } catch (Exception e) { - LOG.warn("Could not remove master from list of RS", e); - } - } - - /** - * @return List of servers from the exclude file in format 'hostname:port'. - */ - private ArrayList readExcludes(String excludeFile) throws IOException { - ArrayList excludeServers = new ArrayList<>(); - if (excludeFile == null) { - return excludeServers; - } else { - File f = new File(excludeFile); - String line; - BufferedReader br = null; - try { - br = new BufferedReader(new FileReader(f)); - while ((line = br.readLine()) != null) { - line = line.trim(); - if (!line.equals("")) { - excludeServers.add(line); - } - } - } catch (IOException e) { - LOG.warn("Exception while reading excludes file,continuing anyways", e); - } finally { - if (br != null) { - br.close(); - } - } - return excludeServers; - } + stripServer(regionServers, master.getHostname(), master.getPort()); } /** * Remove the servername whose hostname and port portion matches from the passed array of servers. * Returns as side-effect the servername removed. - * @param regionServers - * @param hostname - * @param port * @return server removed from list of Region Servers - * @throws Exception */ - private String stripServer(ArrayList regionServers, String hostname, int port) - throws Exception { - String server = null; + private ServerName stripServer(List regionServers, String hostname, int port) { + ServerName server = null; String portString = Integer.toString(port); - Iterator i = regionServers.iterator(); - int noOfRs = regionServers.size(); + Iterator i = regionServers.iterator(); while (i.hasNext()) { server = i.next(); - String[] splitServer = server.split(ServerName.SERVERNAME_SEPARATOR); + String[] splitServer = server.getServerName().split(ServerName.SERVERNAME_SEPARATOR); if (splitServer[0].equalsIgnoreCase(hostname) && splitServer[1].equals(portString)) { i.remove(); return server; } } - if (regionServers.size() >= noOfRs) { - throw new Exception("Server " + hostname + ":" + Integer.toString(port) - + " is not in list of online servers(Offline/Incorrect)"); - } return server; } - /** - * Get Arraylist of Servers in the cluster - * @param admin - * @return ArrayList of online region servers - * @throws IOException - */ - private ArrayList getServers(Admin admin) throws IOException { - ArrayList serverInfo = new ArrayList<>( - admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet()); - ArrayList regionServers = new ArrayList<>(serverInfo.size()); - for (ServerName server : serverInfo) { - regionServers.add(server.getServerName().toLowerCase()); - } - return regionServers; - } - - private void deleteFile(String filename) { - File f = new File(filename); - if (f.exists()) { - f.delete(); - } - } - /** * Tries to scan a row from passed region - * @param admin - * @param region - * @throws IOException */ - private void isSuccessfulScan(Admin admin, RegionInfo region) throws IOException { - Scan scan = new Scan(region.getStartKey()); - scan.setBatch(1); - scan.setCaching(1); - scan.setFilter(new FirstKeyOnlyFilter()); - try { - Table table = admin.getConnection().getTable(region.getTable()); - try { - ResultScanner scanner = table.getScanner(scan); - try { - scanner.next(); - } finally { - scanner.close(); - } - } finally { - table.close(); - } + private void isSuccessfulScan(RegionInfo region) throws IOException { + Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit() + .setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter()) + .setCacheBlocks(false); + try (Table table = conn.getTable(region.getTable()); + ResultScanner scanner = table.getScanner(scan)) { + scanner.next(); } catch (IOException e) { LOG.error("Could not scan region:" + region.getEncodedName(), e); throw e; @@ -876,15 +699,11 @@ public class RegionMover extends AbstractHBaseTool { /** * Returns true if passed region is still on serverName when we look at hbase:meta. - * @param admin - * @param region - * @param serverName * @return true if region is hosted on serverName otherwise false - * @throws IOException */ - private boolean isSameServer(Admin admin, RegionInfo region, String serverName) + private boolean isSameServer(RegionInfo region, ServerName serverName) throws IOException { - String serverForRegion = getServerNameForRegion(admin, region); + ServerName serverForRegion = getServerNameForRegion(region); if (serverForRegion != null && serverForRegion.equals(serverName)) { return true; } @@ -894,55 +713,13 @@ public class RegionMover extends AbstractHBaseTool { /** * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + * startcode comma-delimited. Can return null - * @param admin - * @param region * @return regionServer hosting the given region - * @throws IOException */ - private String getServerNameForRegion(Admin admin, RegionInfo region) throws IOException { - String server = null; + private ServerName getServerNameForRegion(RegionInfo region) throws IOException { if (!admin.isTableEnabled(region.getTable())) { return null; } - if (region.isMetaRegion()) { - ZKWatcher zkw = new ZKWatcher(admin.getConfiguration(), "region_mover", null); - MetaTableLocator locator = new MetaTableLocator(); - int maxWaitInSeconds = - admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); - try { - server = locator.waitMetaRegionLocation(zkw, maxWaitInSeconds * 1000).toString(); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for location of Meta", e); - } finally { - if (zkw != null) { - zkw.close(); - } - } - } else { - Table table = admin.getConnection().getTable(TableName.META_TABLE_NAME); - try { - Get get = new Get(region.getRegionName()); - get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - get.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); - Result result = table.get(get); - if (result != null) { - byte[] servername = - result.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - byte[] startcode = - result.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); - if (servername != null) { - server = Bytes.toString(servername).replaceFirst(":", ",").toLowerCase() + "," + - Bytes.toLong(startcode); - } - } - } catch (IOException e) { - LOG.error("Could not get Server Name for region:" + region.getEncodedName(), e); - throw e; - } finally { - table.close(); - } - } - return server; + return MetaTableAccessor.getRegionLocation(conn, region).getServerName(); } @Override @@ -950,18 +727,18 @@ public class RegionMover extends AbstractHBaseTool { this.addRequiredOptWithArg("r", "regionserverhost", "region server |"); this.addRequiredOptWithArg("o", "operation", "Expected: load/unload"); this.addOptWithArg("m", "maxthreads", - "Define the maximum number of threads to use to unload and reload the regions"); + "Define the maximum number of threads to use to unload and reload the regions"); this.addOptWithArg("x", "excludefile", - "File with per line to exclude as unload targets; default excludes only " - + "target host; useful for rack decommisioning."); + "File with per line to exclude as unload targets; default excludes only " + + "target host; useful for rack decommisioning."); this.addOptWithArg("f", "filename", - "File to save regions list into unloading, or read from loading; " - + "default /tmp/"); + "File to save regions list into unloading, or read from loading; " + + "default /tmp/"); this.addOptNoArg("n", "noack", - "Turn on No-Ack mode(default: false) which won't check if region is online on target " - + "RegionServer, hence best effort. This is more performant in unloading and loading " - + "but might lead to region being unavailable for some time till master reassigns it " - + "in case the move failed"); + "Turn on No-Ack mode(default: false) which won't check if region is online on target " + + "RegionServer, hence best effort. This is more performant in unloading and loading " + + "but might lead to region being unavailable for some time till master reassigns it " + + "in case the move failed"); this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); } @@ -991,14 +768,15 @@ public class RegionMover extends AbstractHBaseTool { @Override protected int doWork() throws Exception { boolean success; - RegionMover rm = rmbuilder.build(); - if (loadUnload.equalsIgnoreCase("load")) { - success = rm.load(); - } else if (loadUnload.equalsIgnoreCase("unload")) { - success = rm.unload(); - } else { - printUsage(); - success = false; + try (RegionMover rm = rmbuilder.build()) { + if (loadUnload.equalsIgnoreCase("load")) { + success = rm.load(); + } else if (loadUnload.equalsIgnoreCase("unload")) { + success = rm.unload(); + } else { + printUsage(); + success = false; + } } return (success ? 0 : 1); } @@ -1006,4 +784,4 @@ public class RegionMover extends AbstractHBaseTool { public static void main(String[] args) { new RegionMover().doStaticMain(args); } -} +} \ No newline at end of file