From 939697b415201348ff4523321e316dfaf2206630 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 27 Oct 2015 22:04:51 +0000 Subject: [PATCH] HBASE-13014 Java Tool For Region Moving (Abhishek Singh Chouhan) --- .../apache/hadoop/hbase/util/RegionMover.java | 988 ++++++++++++++++++ .../hadoop/hbase/util/TestRegionMover.java | 175 ++++ 2 files changed, 1163 insertions(+) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java new file mode 100644 index 00000000000..d2a99bd1512 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionMover.java @@ -0,0 +1,988 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Tool for loading/unloading regions to/from given regionserver This tool can be run from Command + * line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode + * acknowledges if regions are online after movement while noAck mode is best effort mode that + * improves performance but will still move on if region is stuck/not moved. Motivation behind noAck + * mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it + * anyways. This can also be used by constructiong an Object using the builder and then calling + * {@link #load()} or {@link #unload()} methods for the desired operations. + */ +@InterfaceAudience.Public +public class RegionMover extends AbstractHBaseTool { + public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max"; + public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max"; + public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max"; + public static final int DEFAULT_MOVE_RETRIES_MAX = 5; + public static final int DEFAULT_MOVE_WAIT_MAX = 60; + public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180; + static final Log LOG = LogFactory.getLog(RegionMover.class); + private RegionMoverBuilder rmbuilder; + private boolean ack = true; + private int maxthreads = 1; + private int timeout; + private String loadUnload; + private String hostname; + private String filename; + private String excludeFile; + private int port; + + private RegionMover(RegionMoverBuilder builder) { + this.hostname = builder.hostname; + this.filename = builder.filename; + this.excludeFile = builder.excludeFile; + this.maxthreads = builder.maxthreads; + this.ack = builder.ack; + this.port = builder.port; + this.timeout = builder.timeout; + } + + private RegionMover() { + } + + /** + * Builder for Region mover.Use the {@link #build()} method to create {@link #RegionMover(String)} + * object Has {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)}, + * {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options + */ + public static class RegionMoverBuilder { + private boolean ack = true; + private int maxthreads = 1; + private int timeout = Integer.MAX_VALUE; + private String hostname; + private String filename; + private String excludeFile = null; + private String defaultDir = "/tmp"; + private int port = HConstants.DEFAULT_REGIONSERVER_PORT; + + /** + * Hostname to unload regions from or load regions to Valid format: or + * + * @param hostname + */ + public RegionMoverBuilder(String hostname) { + String[] splitHostname = hostname.split(":"); + this.hostname = splitHostname[0]; + if (splitHostname.length == 2) { + this.port = Integer.parseInt(splitHostname[1]); + } + setDefaultfilename(this.hostname); + } + + private void setDefaultfilename(String hostname) { + this.filename = + defaultDir + "/" + System.getProperty("user.name") + this.hostname + ":" + + Integer.toString(this.port); + } + + /** + * Path of file where regions will be written to during unloading/read from during loading + * @param filename + * @return RegionMoverBuilder object + */ + public RegionMoverBuilder filename(String filename) { + this.filename = filename; + return this; + } + + /** + * Set the max number of threads that will be used to move regions + * @param threads + * @return RegionMoverBuilder object + */ + public RegionMoverBuilder maxthreads(int threads) { + this.maxthreads = threads; + return this; + } + + /** + * Path of file containing hostnames to be excluded during region movement Exclude file should + * have per line.Port is mandatory here as we can have many RS running on a single + * host + * @param excludefile + * @return RegionMoverBuilder object + */ + public RegionMoverBuilder excludeFile(String excludefile) { + this.excludeFile = excludefile; + return this; + } + + /** + * Set ack/noAck mode. + *

+ * In ack mode regions are acknowledged before and after moving and the move is retried + * hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best + * effort mode,each region movement is tried once.This can be used during graceful shutdown as + * even if we have a stuck region,upon shutdown it'll be reassigned anyway. + *

+ * @param ack + * @return RegionMoverBuilder object + */ + public RegionMoverBuilder ack(boolean ack) { + this.ack = ack; + return this; + } + + /** + * Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for + * movers also have a separate time which is hbase.move.wait.max * number of regions to + * load/unload + * @param timeout in seconds + * @return RegionMoverBuilder object + */ + public RegionMoverBuilder timeout(int timeout) { + this.timeout = timeout; + return this; + } + + /** + * This method builds the appropriate RegionMover object which can then be used to load/unload + * using load and unload methods + * @return RegionMover object + */ + public RegionMover build() { + return new RegionMover(this); + } + } + + /** + * Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover + * Object has to be created using {@link #RegionMover(RegionMoverBuilder)} + * @return true if loading succeeded, false otherwise + * @throws ExecutionException + * @throws InterruptedException if the loader thread was interrupted + * @throws TimeoutException + */ + public boolean load() throws ExecutionException, InterruptedException, TimeoutException { + setConf(); + ExecutorService loadPool = Executors.newFixedThreadPool(1); + Future loadTask = loadPool.submit(new Load(this)); + loadPool.shutdown(); + try { + if (!loadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { + LOG.warn("Timed out before finishing the loading operation. Timeout:" + this.timeout + + "sec"); + loadPool.shutdownNow(); + } + } catch (InterruptedException e) { + loadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + try { + return loadTask.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Interrupted while loading Regions on " + this.hostname, e); + throw e; + } catch (ExecutionException e) { + LOG.error("Error while loading regions on RegionServer " + this.hostname, e); + throw e; + } + } + + private class Load implements Callable { + + private RegionMover rm; + + public Load(RegionMover rm) { + this.rm = rm; + } + + @Override + public Boolean call() throws IOException { + Connection conn = ConnectionFactory.createConnection(rm.conf); + try { + List regionsToMove = readRegionsFromFile(rm.filename); + if (regionsToMove.isEmpty()) { + LOG.info("No regions to load.Exiting"); + return true; + } + Admin admin = conn.getAdmin(); + try { + loadRegions(admin, rm.hostname, rm.port, regionsToMove, rm.ack); + } finally { + admin.close(); + } + } catch (Exception e) { + LOG.error("Error while loading regions to " + rm.hostname, e); + return false; + } finally { + conn.close(); + } + return true; + } + } + + /** + * Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In + * noAck mode we do not make sure that region is successfully online on the target region + * server,hence it is best effort.We do not unload regions to hostnames given in + * {@link #excludeFile}. + * @return true if unloading succeeded, false otherwise + * @throws InterruptedException if the unloader thread was interrupted + * @throws ExecutionException + * @throws TimeoutException + */ + public boolean unload() throws InterruptedException, ExecutionException, TimeoutException { + setConf(); + deleteFile(this.filename); + ExecutorService unloadPool = Executors.newFixedThreadPool(1); + Future unloadTask = unloadPool.submit(new Unload(this)); + unloadPool.shutdown(); + try { + if (!unloadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) { + LOG.warn("Timed out before finishing the unloading operation. Timeout:" + this.timeout + + "sec"); + unloadPool.shutdownNow(); + } + } catch (InterruptedException e) { + unloadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + try { + return unloadTask.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Interrupted while unloading Regions from " + this.hostname, e); + throw e; + } catch (ExecutionException e) { + LOG.error("Error while unloading regions from RegionServer " + this.hostname, e); + throw e; + } + } + + private class Unload implements Callable { + + List movedRegions = Collections.synchronizedList(new ArrayList()); + private RegionMover rm; + + public Unload(RegionMover rm) { + this.rm = rm; + } + + @Override + public Boolean call() throws IOException { + Connection conn = ConnectionFactory.createConnection(rm.conf); + try { + Admin admin = conn.getAdmin(); + // Get Online RegionServers + ArrayList regionServers = getServers(admin); + if (LOG.isDebugEnabled()) { + LOG.debug("Online region servers:" + regionServers.toString()); + } + // Remove the host Region server from target Region Servers list + String server = stripServer(regionServers, hostname, port); + // Remove RS present in the exclude file + stripExcludes(regionServers, rm.excludeFile); + stripMaster(regionServers, admin); + unloadRegions(admin, server, regionServers, rm.ack, movedRegions); + } catch (Exception e) { + LOG.error("Error while unloading regions ", e); + return false; + } finally { + try { + conn.close(); + } catch (IOException e) { + // ignore + } + if (movedRegions != null) { + writeFile(rm.filename, movedRegions); + } + } + return true; + } + } + + /** + * Creates a new configuration if not already set and sets region mover specific overrides + */ + private void setConf() { + if (conf == null) { + conf = HBaseConfiguration.create(); + conf.setInt("hbase.client.prefetch.limit", 1); + conf.setInt("hbase.client.pause", 500); + conf.setInt("hbase.client.retries.number", 100); + } + } + + private void loadRegions(Admin admin, String hostname, int port, + List regionsToMove, boolean ack) throws Exception { + String server = null; + List movedRegions = Collections.synchronizedList(new ArrayList()); + int maxWaitInSeconds = + admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX); + long maxWait = EnvironmentEdgeManager.currentTime() + maxWaitInSeconds * 1000; + while ((EnvironmentEdgeManager.currentTime() < maxWait) && (server == null)) { + try { + ArrayList regionServers = getServers(admin); + // Remove the host Region server from target Region Servers list + server = stripServer(regionServers, hostname, port); + if (server != null) { + break; + } + } catch (IOException e) { + LOG.warn("Could not get list of region servers", e); + } catch (Exception e) { + LOG.info("hostname=" + hostname + " is not up yet, waiting", e); + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for " + hostname + " to be up.Quitting now", e); + throw e; + } + } + if (server == null) { + LOG.error("Host:" + hostname + " is not up.Giving up."); + throw new Exception("Host to load regions not online"); + } + LOG.info("Moving" + regionsToMove.size() + " regions to " + server + " using " + + this.maxthreads + " threads.Ack mode:" + this.ack); + ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); + List> taskList = new ArrayList>(); + int counter = 0; + while (counter < regionsToMove.size()) { + HRegionInfo region = regionsToMove.get(counter); + String currentServer = getServerNameForRegion(admin, region); + if (currentServer == null) { + LOG.warn("Could not get server for Region:" + region.getEncodedName() + " moving on"); + counter++; + continue; + } else if (server.equals(currentServer)) { + LOG.info("Region " + region.getRegionNameAsString() + "already on target server=" + server); + counter++; + continue; + } + if (ack) { + Future task = + moveRegionsPool.submit(new MoveWithAck(admin, region, currentServer, server, + movedRegions)); + taskList.add(task); + } else { + Future task = + moveRegionsPool.submit(new MoveWithoutAck(admin, region, currentServer, server, + movedRegions)); + taskList.add(task); + } + counter++; + } + moveRegionsPool.shutdown(); + long timeoutInSeconds = + regionsToMove.size() + * admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + try { + if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { + moveRegionsPool.shutdownNow(); + } + } catch (InterruptedException e) { + moveRegionsPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + for (Future future : taskList) { + try { + // if even after shutdownNow threads are stuck we wait for 5 secs max + if (!future.get(5, TimeUnit.SECONDS)) { + LOG.error("Was Not able to move region....Exiting Now"); + throw new Exception("Could not move region Exception"); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); + throw e; + } catch (ExecutionException e) { + LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); + throw e; + } catch (CancellationException e) { + LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + + timeoutInSeconds + "secs", e); + throw e; + } + } + } + + private void unloadRegions(Admin admin, String server, ArrayList regionServers, + boolean ack, List movedRegions) throws Exception { + List regionsToMove = new ArrayList(); + regionsToMove = getRegions(this.conf, server); + if (regionsToMove.size() == 0) { + LOG.info("No Regions to move....Quitting now"); + return; + } else if (regionServers.size() == 0) { + LOG.warn("No Regions were moved - no servers available"); + throw new Exception("No online region servers"); + } + while (true) { + regionsToMove = getRegions(this.conf, server); + regionsToMove.removeAll(movedRegions); + if (regionsToMove.size() == 0) { + break; + } + int counter = 0; + LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to " + + regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:" + + ack); + ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads); + List> taskList = new ArrayList>(); + int serverIndex = 0; + while (counter < regionsToMove.size()) { + if (ack) { + Future task = + moveRegionsPool.submit(new MoveWithAck(admin, regionsToMove.get(counter), server, + regionServers.get(serverIndex), movedRegions)); + taskList.add(task); + } else { + Future task = + moveRegionsPool.submit(new MoveWithoutAck(admin, regionsToMove.get(counter), server, + regionServers.get(serverIndex), movedRegions)); + taskList.add(task); + } + counter++; + serverIndex = (serverIndex + 1) % regionServers.size(); + } + moveRegionsPool.shutdown(); + long timeoutInSeconds = + regionsToMove.size() + * admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + try { + if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) { + moveRegionsPool.shutdownNow(); + } + } catch (InterruptedException e) { + moveRegionsPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + for (Future future : taskList) { + try { + // if even after shutdownNow threads are stuck we wait for 5 secs max + if (!future.get(5, TimeUnit.SECONDS)) { + LOG.error("Was Not able to move region....Exiting Now"); + throw new Exception("Could not move region Exception"); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e); + throw e; + } catch (ExecutionException e) { + LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e); + throw e; + } catch (CancellationException e) { + LOG.error("Thread for moving region cancelled. Timeout for cancellation:" + + timeoutInSeconds + "secs", e); + throw e; + } + } + } + } + + /** + * Move Regions and make sure that they are up on the target server.If a region movement fails we + * exit as failure + */ + private class MoveWithAck implements Callable { + private Admin admin; + private HRegionInfo region; + private String targetServer; + private List movedRegions; + private String sourceServer; + + public MoveWithAck(Admin admin, HRegionInfo regionInfo, String sourceServer, + String targetServer, List movedRegions) { + this.admin = admin; + this.region = regionInfo; + this.targetServer = targetServer; + this.movedRegions = movedRegions; + this.sourceServer = sourceServer; + } + + @Override + public Boolean call() throws IOException, InterruptedException { + boolean moved = false; + int count = 0; + int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX); + int maxWaitInSeconds = + admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + long startTime = EnvironmentEdgeManager.currentTime(); + boolean sameServer = true; + // Assert we can scan the region in its current location + isSuccessfulScan(admin, region); + LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to " + + targetServer); + while (count < retries && sameServer) { + if (count > 0) { + LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries)); + } + count = count + 1; + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); + long maxWait = startTime + (maxWaitInSeconds * 1000); + while (EnvironmentEdgeManager.currentTime() < maxWait) { + sameServer = isSameServer(admin, region, sourceServer); + if (!sameServer) { + break; + } + Thread.sleep(100); + } + } + if (sameServer) { + LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer + + ",newServer=" + this.targetServer); + } else { + isSuccessfulScan(admin, region); + LOG.info("Moved Region " + + region.getRegionNameAsString() + + " cost:" + + String.format("%.3f", + (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000)); + moved = true; + movedRegions.add(region); + } + return moved; + } + } + + /** + * Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the + * RS down anyways and not abort on a stuck region. Improves movement performance + */ + private class MoveWithoutAck implements Callable { + private Admin admin; + private HRegionInfo region; + private String targetServer; + private List movedRegions; + private String sourceServer; + + public MoveWithoutAck(Admin admin, HRegionInfo regionInfo, String sourceServer, + String targetServer, List movedRegions) { + this.admin = admin; + this.region = regionInfo; + this.targetServer = targetServer; + this.movedRegions = movedRegions; + this.sourceServer = sourceServer; + } + + @Override + public Boolean call() { + try { + LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to " + + targetServer); + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); + LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to " + + targetServer); + } catch (Exception e) { + LOG.error("Error Moving Region:" + region.getEncodedName(), e); + } finally { + // we add region to the moved regions list in No Ack Mode since this is best effort + movedRegions.add(region); + } + return true; + } + } + + private List readRegionsFromFile(String filename) throws IOException { + List regions = new ArrayList(); + File f = new File(filename); + if (!f.exists()) { + return regions; + } + FileInputStream fis = null; + DataInputStream dis = null; + try { + fis = new FileInputStream(f); + dis = new DataInputStream(fis); + int numRegions = dis.readInt(); + int index = 0; + while (index < numRegions) { + regions.add(HRegionInfo.parseFromOrNull(Bytes.readByteArray(dis))); + index++; + } + } catch (IOException e) { + LOG.error("Error while reading regions from file:" + filename, e); + throw e; + } finally { + if (dis != null) { + dis.close(); + } + if (fis != null) { + fis.close(); + } + } + return regions; + } + + /** + * Get online regions of the passed server + * @param conf + * @param server + * @return List of Regions online on the server + * @throws IOException + */ + private List getRegions(Configuration conf, String server) throws IOException { + Connection conn = ConnectionFactory.createConnection(conf); + try { + return conn.getAdmin().getOnlineRegions(ServerName.valueOf(server)); + } finally { + conn.close(); + } + } + + /** + * Write the number of regions moved in the first line followed by regions moved in subsequent + * lines + * @param filename + * @param movedRegions + * @throws IOException + */ + private void writeFile(String filename, List movedRegions) throws IOException { + FileOutputStream fos = null; + DataOutputStream dos = null; + try { + fos = new FileOutputStream(filename); + dos = new DataOutputStream(fos); + dos.writeInt(movedRegions.size()); + for (HRegionInfo region : movedRegions) { + Bytes.writeByteArray(dos, region.toByteArray()); + } + } catch (IOException e) { + LOG.error("ERROR: Was Not able to write regions moved to output file but moved " + + movedRegions.size() + " regions", e); + throw e; + } finally { + if (dos != null) { + dos.close(); + } + if (fos != null) { + fos.close(); + } + } + } + + /** + * Excludes the servername whose hostname and port portion matches the list given in exclude file + * @param regionServers + * @param excludeFile + * @throws IOException + */ + private void stripExcludes(ArrayList regionServers, String excludeFile) + throws IOException { + if (excludeFile != null) { + ArrayList excludes = readExcludes(excludeFile); + Iterator i = regionServers.iterator(); + while (i.hasNext()) { + String rs = i.next(); + String rsPort = + rs.split(ServerName.SERVERNAME_SEPARATOR)[0] + ":" + + rs.split(ServerName.SERVERNAME_SEPARATOR)[1]; + if (excludes.contains(rsPort)) { + i.remove(); + } + } + LOG.info("Valid Region server targets are:" + regionServers.toString()); + LOG.info("Excluded Servers are" + excludes.toString()); + } + } + + /** + * Exclude master from list of RSs to move regions to + * @param regionServers + * @param admin + * @throws Exception + */ + private void stripMaster(ArrayList regionServers, Admin admin) throws Exception { + stripServer(regionServers, admin.getClusterStatus().getMaster().getHostname(), + admin.getClusterStatus().getMaster().getPort()); + } + + /** + * Create an Arraylst of servers listed in exclude file + * @param excludeFile + * @return ArrayList of servers to be excluded in format + * @throws IOException + */ + private ArrayList readExcludes(String excludeFile) throws IOException { + ArrayList excludeServers = new ArrayList(); + if (excludeFile == null) { + return excludeServers; + } else { + File f = new File(excludeFile); + String line; + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(f)); + while ((line = br.readLine()) != null) { + line.trim(); + if (!line.equals("")) { + excludeServers.add(line); + } + } + } catch (IOException e) { + LOG.warn("Exception while reading excludes file,continuing anyways", e); + } finally { + if (br != null) { + br.close(); + } + } + return excludeServers; + } + } + + /** + * Remove the servername whose hostname and port portion matches from the passed array of servers. + * Returns as side-effect the servername removed. + * @param regionServers + * @param hostname + * @param port + * @return server removed from list of Region Servers + * @throws Exception + */ + private String stripServer(ArrayList regionServers, String hostname, int port) + throws Exception { + String server = null; + String portString = Integer.toString(port); + Iterator i = regionServers.iterator(); + int noOfRs = regionServers.size(); + while (i.hasNext()) { + server = i.next(); + String[] splitServer = server.split(ServerName.SERVERNAME_SEPARATOR); + if (splitServer[0].equals(hostname) && splitServer[1].equals(portString)) { + i.remove(); + return server; + } + } + if (regionServers.size() >= noOfRs) { + throw new Exception("Server " + hostname + ":" + Integer.toString(port) + + " is not in list of online servers(Offline/Incorrect)"); + } + return server; + } + + /** + * Get Arraylist of Servers in the cluster + * @param admin + * @return ArrayList of online region servers + * @throws IOException + */ + private ArrayList getServers(Admin admin) throws IOException { + ArrayList serverInfo = + new ArrayList(admin.getClusterStatus().getServers()); + ArrayList regionServers = new ArrayList(); + for (ServerName server : serverInfo) { + regionServers.add(server.getServerName()); + } + return regionServers; + } + + private void deleteFile(String filename) { + File f = new File(filename); + if (f.exists()) { + f.delete(); + } + } + + /** + * Tries to scan a row from passed region + * @param admin + * @param region + * @throws IOException + */ + private void isSuccessfulScan(Admin admin, HRegionInfo region) throws IOException { + Scan scan = new Scan(region.getStartKey()); + scan.setBatch(1); + scan.setCaching(1); + scan.setFilter(new FirstKeyOnlyFilter()); + try { + Table table = admin.getConnection().getTable(region.getTable()); + try { + ResultScanner scanner = table.getScanner(scan); + try { + scanner.next(); + } finally { + scanner.close(); + } + } finally { + table.close(); + } + } catch (IOException e) { + LOG.error("Could not scan region:" + region.getEncodedName(), e); + throw e; + } + } + + /** + * Returns true if passed region is still on serverName when we look at hbase:meta. + * @param admin + * @param region + * @param serverName + * @return true if region is hosted on serverName otherwise false + * @throws IOException + */ + private boolean isSameServer(Admin admin, HRegionInfo region, String serverName) + throws IOException { + String serverForRegion = getServerNameForRegion(admin, region); + if (serverForRegion != null && serverForRegion.equals(serverName)) { + return true; + } + return false; + } + + /** + * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + + * startcode comma-delimited. Can return null + * @param admin + * @param region + * @return regionServer hosting the given region + * @throws IOException + */ + private String getServerNameForRegion(Admin admin, HRegionInfo region) throws IOException { + String server = null; + if (!admin.isTableEnabled(region.getTable())) { + return null; + } + if (region.isMetaRegion()) { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(admin.getConfiguration(), "region_mover", null); + MetaTableLocator locator = new MetaTableLocator(); + int maxWaitInSeconds = + admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX); + try { + server = locator.waitMetaRegionLocation(zkw, maxWaitInSeconds * 1000).toString() + ","; + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for location of Meta", e); + } finally { + if (zkw != null) { + zkw.close(); + } + } + } else { + Table table = admin.getConnection().getTable(TableName.META_TABLE_NAME); + try { + Get get = new Get(region.getRegionName()); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); + Result result = table.get(get); + if (result != null) { + byte[] servername = + result.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); + byte[] startcode = + result.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); + if (servername != null) { + server = + Bytes.toString(servername).replaceFirst(":", ",") + "," + Bytes.toLong(startcode); + } + } + } catch (IOException e) { + LOG.error("Could not get Server Name for region:" + region.getEncodedName(), e); + throw e; + } finally { + table.close(); + } + } + return server; + } + + @Override + protected void addOptions() { + this.addRequiredOptWithArg("r", "regionserverhost", "region server |"); + this.addRequiredOptWithArg("l", "Expected: load/unload"); + this.addOptWithArg("m", "maxthreads", + "Define the maximum number of threads to use to unload and reload the regions"); + this.addOptWithArg("x", "excludefile", + "File with per line to exclude as unload targets; default excludes only " + + "target host; useful for rack decommisioning."); + this.addOptWithArg("f", "filename", + "File to save regions list into unloading, or read from loading; " + + "default /tmp/"); + this.addOptNoArg("n", "noAck", + "Enable Ack mode(default: true) which checks if region is online on target RegionServer -- " + + "Upon disabling,in case a region is stuck, it'll move on anyways"); + this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit " + + "irrespective of whether it finished or not;default Integer.MAX_VALUE"); + } + + @Override + protected void processOptions(CommandLine cmd) { + String hostname = cmd.getOptionValue("r"); + rmbuilder = new RegionMoverBuilder(hostname); + if (cmd.hasOption('m')) { + rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m'))); + } + if (cmd.hasOption('n')) { + rmbuilder.ack(false); + } + if (cmd.hasOption('f')) { + rmbuilder.filename(cmd.getOptionValue('f')); + } + if (cmd.hasOption('x')) { + rmbuilder.excludeFile(cmd.getOptionValue('x')); + } + if (cmd.hasOption('t')) { + rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t'))); + } + this.loadUnload = cmd.getOptionValue("l").toLowerCase(); + } + + @Override + protected int doWork() throws Exception { + RegionMover rm = rmbuilder.build(); + if (loadUnload.equalsIgnoreCase("load")) { + rm.load(); + } else if (loadUnload.equalsIgnoreCase("unload")) { + rm.unload(); + } else { + printUsage(); + System.exit(1); + } + return 0; + } + + public static void main(String[] args) { + new RegionMover().doStaticMain(args); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java new file mode 100644 index 00000000000..4fd9f653449 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionMover.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.io.FileWriter; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.RegionMover.RegionMoverBuilder; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests for Region Mover Load/Unload functionality with and without ack mode and also to test + * exclude functionality useful for rack decommissioning + */ +@Category(MediumTests.class) +public class TestRegionMover { + + final Log LOG = LogFactory.getLog(getClass()); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + // Create a pre-split table just to populate some regions + TableName tableName = TableName.valueOf("testRegionMover"); + Admin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + TEST_UTIL.deleteTable(tableName); + } + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + try { + admin.setBalancerRunning(false, true); + String startKey = "a"; + String endKey = "z"; + admin.createTable(tableDesc, startKey.getBytes(), endKey.getBytes(), 9); + } finally { + if (admin != null) { + admin.close(); + } + } + } + + @Test + public void testLoadWithAck() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + String rsName = regionServer.getServerName().getHostname(); + int port = regionServer.getServerName().getPort(); + int noRegions = regionServer.getNumberOfOnlineRegions(); + String rs = rsName + ":" + Integer.toString(port); + RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true).maxthreads(8); + RegionMover rm = rmBuilder.build(); + rm.setConf(TEST_UTIL.getConfiguration()); + LOG.info("Unloading " + rs); + rm.unload(); + assertEquals(0, regionServer.getNumberOfOnlineRegions()); + LOG.info("Successfully Unloaded\nNow Loading"); + rm.load(); + assertEquals(noRegions, regionServer.getNumberOfOnlineRegions()); + } + + @Test + public void testLoadWithoutAck() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + String rsName = regionServer.getServerName().getHostname(); + int port = regionServer.getServerName().getPort(); + int noRegions = regionServer.getNumberOfOnlineRegions(); + String rs = rsName + ":" + Integer.toString(port); + RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false); + RegionMover rm = rmBuilder.build(); + rm.setConf(TEST_UTIL.getConfiguration()); + LOG.info("Unloading " + rs); + rm.unload(); + assertEquals(0, regionServer.getNumberOfOnlineRegions()); + LOG.info("Successfully Unloaded\nNow Loading"); + rm.load(); + Thread.sleep(100); + assertEquals(noRegions, regionServer.getNumberOfOnlineRegions()); + } + + @Test + public void testUnloadWithoutAck() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + String rsName = regionServer.getServerName().getHostname(); + int port = regionServer.getServerName().getPort(); + String rs = rsName + ":" + Integer.toString(port); + RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false); + RegionMover rm = rmBuilder.build(); + rm.setConf(TEST_UTIL.getConfiguration()); + LOG.info("Unloading " + rs); + rm.unload(); + assertEquals(0, regionServer.getNumberOfOnlineRegions()); + } + + @Test + public void testUnloadWithAck() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer regionServer = cluster.getRegionServer(0); + String rsName = regionServer.getServerName().getHostname(); + int port = regionServer.getServerName().getPort(); + String rs = rsName + ":" + Integer.toString(port); + RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true); + RegionMover rm = rmBuilder.build(); + rm.setConf(TEST_UTIL.getConfiguration()); + rm.unload(); + LOG.info("Unloading " + rs); + assertEquals(0, regionServer.getNumberOfOnlineRegions()); + } + + /** + * To test that we successfully exclude a server from the unloading process We test for the number + * of regions on Excluded server and also test that regions are unloaded successfully + * @throws Exception + */ + @Test + public void testExclude() throws Exception { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + FileWriter fos = new FileWriter("/tmp/exclude_file"); + HRegionServer excludeServer = cluster.getRegionServer(1); + String excludeHostname = excludeServer.getServerName().getHostname(); + int excludeServerPort = excludeServer.getServerName().getPort(); + int regionsExcludeServer = excludeServer.getNumberOfOnlineRegions(); + String excludeServerName = excludeHostname + ":" + Integer.toString(excludeServerPort); + fos.write(excludeServerName); + fos.close(); + HRegionServer regionServer = cluster.getRegionServer(0); + String rsName = regionServer.getServerName().getHostname(); + int port = regionServer.getServerName().getPort(); + String rs = rsName + ":" + Integer.toString(port); + RegionMoverBuilder rmBuilder = + new RegionMoverBuilder(rs).ack(true).excludeFile("/tmp/exclude_file"); + RegionMover rm = rmBuilder.build(); + rm.setConf(TEST_UTIL.getConfiguration()); + rm.unload(); + LOG.info("Unloading " + rs); + assertEquals(0, regionServer.getNumberOfOnlineRegions()); + assertEquals(regionsExcludeServer, cluster.getRegionServer(1).getNumberOfOnlineRegions()); + LOG.info("Before:" + regionsExcludeServer + " After:" + + cluster.getRegionServer(1).getNumberOfOnlineRegions()); + } + +}