Revert "HBASE-13014 Java Tool For Region Moving"
Breaks the build. I applied wrong patch?
This reverts commit a9a56ad508
.
This commit is contained in:
parent
a9a56ad508
commit
87cc38f9ab
|
@ -1,997 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
|
||||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
|
||||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
|
||||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
|
||||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
|
||||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
|
||||||
* for the specific language governing permissions and limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.util;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.CancellationException;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tool for loading/unloading regions to/from given regionserver This tool can be run from Command
|
|
||||||
* line directly as a utility. Supports Ack/No Ack mode for loading/unloading operations.Ack mode
|
|
||||||
* acknowledges if regions are online after movement while noAck mode is best effort mode that
|
|
||||||
* improves performance but will still move on if region is stuck/not moved. Motivation behind noAck
|
|
||||||
* mode being RS shutdown where even if a Region is stuck, upon shutdown master will move it
|
|
||||||
* anyways. This can also be used by constructiong an Object using the builder and then calling
|
|
||||||
* {@link #load()} or {@link #unload()} methods for the desired operations.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
public class RegionMover extends AbstractHBaseTool {
|
|
||||||
public static final String MOVE_RETRIES_MAX_KEY = "hbase.move.retries.max";
|
|
||||||
public static final String MOVE_WAIT_MAX_KEY = "hbase.move.wait.max";
|
|
||||||
public static final String SERVERSTART_WAIT_MAX_KEY = "hbase.serverstart.wait.max";
|
|
||||||
public static final int DEFAULT_MOVE_RETRIES_MAX = 5;
|
|
||||||
public static final int DEFAULT_MOVE_WAIT_MAX = 60;
|
|
||||||
public static final int DEFAULT_SERVERSTART_WAIT_MAX = 180;
|
|
||||||
static final Log LOG = LogFactory.getLog(RegionMover.class);
|
|
||||||
private RegionMoverBuilder rmbuilder;
|
|
||||||
private boolean ack = true;
|
|
||||||
private int maxthreads = 1;
|
|
||||||
private int timeout;
|
|
||||||
private String loadUnload;
|
|
||||||
private String hostname;
|
|
||||||
private String filename;
|
|
||||||
private String excludeFile;
|
|
||||||
private int port;
|
|
||||||
|
|
||||||
private RegionMover(RegionMoverBuilder builder) {
|
|
||||||
this.hostname = builder.hostname;
|
|
||||||
this.filename = builder.filename;
|
|
||||||
this.excludeFile = builder.excludeFile;
|
|
||||||
this.maxthreads = builder.maxthreads;
|
|
||||||
this.ack = builder.ack;
|
|
||||||
this.port = builder.port;
|
|
||||||
this.timeout = builder.timeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
private RegionMover() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Builder for Region mover.Use the {@link #build()} method to create {@link #RegionMover(String)}
|
|
||||||
* object Has {@link #filename(String)}, {@link #excludeFile(String)}, {@link #maxthreads(int)},
|
|
||||||
* {@link #ack(boolean)}, {@link #timeout(int)} methods to set the corresponding options
|
|
||||||
*/
|
|
||||||
public static class RegionMoverBuilder {
|
|
||||||
private boolean ack = true;
|
|
||||||
private int maxthreads = 1;
|
|
||||||
private int timeout = Integer.MAX_VALUE;
|
|
||||||
private String hostname;
|
|
||||||
private String filename;
|
|
||||||
private String excludeFile = null;
|
|
||||||
private String defaultDir = "/tmp";
|
|
||||||
private int port = HConstants.DEFAULT_REGIONSERVER_PORT;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Hostname to unload regions from or load regions to Valid format: <hostname> or
|
|
||||||
* <hostname:port>
|
|
||||||
* @param hostname
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder(String hostname) {
|
|
||||||
String[] splitHostname = hostname.split(":");
|
|
||||||
this.hostname = splitHostname[0];
|
|
||||||
if (splitHostname.length == 2) {
|
|
||||||
this.port = Integer.parseInt(splitHostname[1]);
|
|
||||||
}
|
|
||||||
setDefaultfilename(this.hostname);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setDefaultfilename(String hostname) {
|
|
||||||
this.filename =
|
|
||||||
defaultDir + "/" + System.getProperty("user.name") + this.hostname + ":"
|
|
||||||
+ Integer.toString(this.port);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Path of file where regions will be written to during unloading/read from during loading
|
|
||||||
* @param filename
|
|
||||||
* @return RegionMoverBuilder object
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder filename(String filename) {
|
|
||||||
this.filename = filename;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the max number of threads that will be used to move regions
|
|
||||||
* @param threads
|
|
||||||
* @return RegionMoverBuilder object
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder maxthreads(int threads) {
|
|
||||||
this.maxthreads = threads;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Path of file containing hostnames to be excluded during region movement Exclude file should
|
|
||||||
* have <host:port> per line.Port is mandatory here as we can have many RS running on a single
|
|
||||||
* host
|
|
||||||
* @param excludefile
|
|
||||||
* @return RegionMoverBuilder object
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder excludeFile(String excludefile) {
|
|
||||||
this.excludeFile = excludefile;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set ack/noAck mode.
|
|
||||||
* <p>
|
|
||||||
* In ack mode regions are acknowledged before and after moving and the move is retried
|
|
||||||
* hbase.move.retries.max times, if unsuccessful we quit with exit code 1.No Ack mode is a best
|
|
||||||
* effort mode,each region movement is tried once.This can be used during graceful shutdown as
|
|
||||||
* even if we have a stuck region,upon shutdown it'll be reassigned anyway.
|
|
||||||
* <p>
|
|
||||||
* @param ack
|
|
||||||
* @return RegionMoverBuilder object
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder ack(boolean ack) {
|
|
||||||
this.ack = ack;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the timeout for Load/Unload operation in seconds.This is a global timeout,threadpool for
|
|
||||||
* movers also have a separate time which is hbase.move.wait.max * number of regions to
|
|
||||||
* load/unload
|
|
||||||
* @param timeout in seconds
|
|
||||||
* @return RegionMoverBuilder object
|
|
||||||
*/
|
|
||||||
public RegionMoverBuilder timeout(int timeout) {
|
|
||||||
this.timeout = timeout;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method builds the appropriate RegionMover object which can then be used to load/unload
|
|
||||||
* using load and unload methods
|
|
||||||
* @return RegionMover object
|
|
||||||
*/
|
|
||||||
public RegionMover build() {
|
|
||||||
return new RegionMover(this);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Loads the specified {@link #hostname} with regions listed in the {@link #filename} RegionMover
|
|
||||||
* Object has to be created using {@link #RegionMover(RegionMoverBuilder)}
|
|
||||||
* @return true if loading succeeded, false otherwise
|
|
||||||
* @throws ExecutionException
|
|
||||||
* @throws InterruptedException if the loader thread was interrupted
|
|
||||||
* @throws TimeoutException
|
|
||||||
*/
|
|
||||||
public boolean load() throws ExecutionException, InterruptedException, TimeoutException {
|
|
||||||
setConf();
|
|
||||||
ExecutorService loadPool = Executors.newFixedThreadPool(1);
|
|
||||||
Future<Boolean> loadTask = loadPool.submit(new Load(this));
|
|
||||||
loadPool.shutdown();
|
|
||||||
try {
|
|
||||||
if (!loadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
|
|
||||||
LOG.warn("Timed out before finishing the loading operation. Timeout:" + this.timeout
|
|
||||||
+ "sec");
|
|
||||||
loadPool.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
loadPool.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return loadTask.get(5, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Interrupted while loading Regions on " + this.hostname, e);
|
|
||||||
throw e;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
LOG.error("Error while loading regions on RegionServer " + this.hostname, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Load implements Callable<Boolean> {
|
|
||||||
|
|
||||||
private HBaseAdmin admin = null;
|
|
||||||
private RegionMover rm;
|
|
||||||
|
|
||||||
public Load(RegionMover rm) {
|
|
||||||
this.rm = rm;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws IOException {
|
|
||||||
try {
|
|
||||||
admin = new HBaseAdmin(rm.conf);
|
|
||||||
List<HRegionInfo> regionsToMove = readRegionsFromFile(rm.filename);
|
|
||||||
if (regionsToMove.isEmpty()) {
|
|
||||||
LOG.info("No regions to load.Exiting");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
loadRegions(admin, rm.hostname, rm.port, regionsToMove, rm.ack);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error while loading regions to " + rm.hostname, e);
|
|
||||||
return false;
|
|
||||||
} finally {
|
|
||||||
if (admin != null) {
|
|
||||||
admin.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unload regions from given {@link #hostname} using ack/noAck mode and {@link #maxthreads}.In
|
|
||||||
* noAck mode we do not make sure that region is successfully online on the target region
|
|
||||||
* server,hence it is best effort.We do not unload regions to hostnames given in
|
|
||||||
* {@link #excludeFile}.
|
|
||||||
* @return true if unloading succeeded, false otherwise
|
|
||||||
* @throws InterruptedException if the unloader thread was interrupted
|
|
||||||
* @throws ExecutionException
|
|
||||||
* @throws TimeoutException
|
|
||||||
*/
|
|
||||||
public boolean unload() throws InterruptedException, ExecutionException, TimeoutException {
|
|
||||||
setConf();
|
|
||||||
deleteFile(this.filename);
|
|
||||||
ExecutorService unloadPool = Executors.newFixedThreadPool(1);
|
|
||||||
Future<Boolean> unloadTask = unloadPool.submit(new Unload(this));
|
|
||||||
unloadPool.shutdown();
|
|
||||||
try {
|
|
||||||
if (!unloadPool.awaitTermination((long) this.timeout, TimeUnit.SECONDS)) {
|
|
||||||
LOG.warn("Timed out before finishing the unloading operation. Timeout:" + this.timeout
|
|
||||||
+ "sec");
|
|
||||||
unloadPool.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
unloadPool.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return unloadTask.get(5, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Interrupted while unloading Regions from " + this.hostname, e);
|
|
||||||
throw e;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
LOG.error("Error while unloading regions from RegionServer " + this.hostname, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Unload implements Callable<Boolean> {
|
|
||||||
|
|
||||||
private HBaseAdmin admin = null;
|
|
||||||
List<HRegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<HRegionInfo>());
|
|
||||||
private RegionMover rm;
|
|
||||||
|
|
||||||
public Unload(RegionMover rm) {
|
|
||||||
this.rm = rm;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws IOException {
|
|
||||||
try {
|
|
||||||
admin = new HBaseAdmin(rm.conf);
|
|
||||||
// Get Online RegionServers
|
|
||||||
ArrayList<String> regionServers = getServers(admin);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Online region servers:" + regionServers.toString());
|
|
||||||
}
|
|
||||||
// Remove the host Region server from target Region Servers list
|
|
||||||
String server = stripServer(regionServers, hostname, port);
|
|
||||||
// Remove RS present in the exclude file
|
|
||||||
stripExcludes(regionServers, rm.excludeFile);
|
|
||||||
unloadRegions(admin, server, regionServers, rm.ack, movedRegions);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error while unloading regions ", e);
|
|
||||||
return false;
|
|
||||||
} finally {
|
|
||||||
if (admin != null) {
|
|
||||||
admin.close();
|
|
||||||
}
|
|
||||||
if (movedRegions != null) {
|
|
||||||
writeFile(rm.filename, movedRegions);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new configuration if not already set and sets region mover specific overrides
|
|
||||||
*/
|
|
||||||
private void setConf() {
|
|
||||||
if (conf == null) {
|
|
||||||
conf = HBaseConfiguration.create();
|
|
||||||
conf.setInt("hbase.client.prefetch.limit", 1);
|
|
||||||
conf.setInt("hbase.client.pause", 500);
|
|
||||||
conf.setInt("hbase.client.retries.number", 100);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void loadRegions(HBaseAdmin admin, String hostname, int port,
|
|
||||||
List<HRegionInfo> regionsToMove, boolean ack) throws Exception {
|
|
||||||
String server = null;
|
|
||||||
List<HRegionInfo> movedRegions = Collections.synchronizedList(new ArrayList<HRegionInfo>());
|
|
||||||
int maxWaitInSeconds =
|
|
||||||
admin.getConfiguration().getInt(SERVERSTART_WAIT_MAX_KEY, DEFAULT_SERVERSTART_WAIT_MAX);
|
|
||||||
long maxWait = EnvironmentEdgeManager.currentTimeMillis() + maxWaitInSeconds * 1000;
|
|
||||||
while ((EnvironmentEdgeManager.currentTimeMillis() < maxWait) && (server == null)) {
|
|
||||||
try {
|
|
||||||
ArrayList<String> regionServers = getServers(admin);
|
|
||||||
// Remove the host Region server from target Region Servers list
|
|
||||||
server = stripServer(regionServers, hostname, port);
|
|
||||||
if (server != null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Could not get list of region servers", e);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.info("hostname=" + hostname + " is not up yet, waiting", e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(500);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Interrupted while waiting for " + hostname + " to be up.Quitting now", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (server == null) {
|
|
||||||
LOG.error("Host:" + hostname + " is not up.Giving up.");
|
|
||||||
throw new Exception("Host to load regions not online");
|
|
||||||
}
|
|
||||||
LOG.info("Moving" + regionsToMove.size() + " regions to " + server + " using "
|
|
||||||
+ this.maxthreads + " threads.Ack mode:" + this.ack);
|
|
||||||
ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
|
||||||
List<Future<Boolean>> taskList = new ArrayList<Future<Boolean>>();
|
|
||||||
int counter = 0;
|
|
||||||
while (counter < regionsToMove.size()) {
|
|
||||||
HRegionInfo region = regionsToMove.get(counter);
|
|
||||||
String currentServer = getServerNameForRegion(admin, region);
|
|
||||||
if (currentServer == null) {
|
|
||||||
LOG.warn("Could not get server for Region:" + region.getEncodedName() + " moving on");
|
|
||||||
counter++;
|
|
||||||
continue;
|
|
||||||
} else if (server.equals(currentServer)) {
|
|
||||||
LOG.info("Region " + region.getRegionNameAsString() + "already on target server=" + server);
|
|
||||||
counter++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ack) {
|
|
||||||
Future<Boolean> task =
|
|
||||||
moveRegionsPool.submit(new MoveWithAck(admin, region, currentServer, server,
|
|
||||||
movedRegions));
|
|
||||||
taskList.add(task);
|
|
||||||
} else {
|
|
||||||
Future<Boolean> task =
|
|
||||||
moveRegionsPool.submit(new MoveWithoutAck(admin, region, currentServer, server,
|
|
||||||
movedRegions));
|
|
||||||
taskList.add(task);
|
|
||||||
}
|
|
||||||
counter++;
|
|
||||||
}
|
|
||||||
moveRegionsPool.shutdown();
|
|
||||||
long timeoutInSeconds =
|
|
||||||
regionsToMove.size()
|
|
||||||
* admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
|
||||||
try {
|
|
||||||
if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
|
|
||||||
moveRegionsPool.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
moveRegionsPool.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
for (Future<Boolean> future : taskList) {
|
|
||||||
try {
|
|
||||||
// if even after shutdownNow threads are stuck we wait for 5 secs max
|
|
||||||
if (!future.get(5, TimeUnit.SECONDS)) {
|
|
||||||
LOG.error("Was Not able to move region....Exiting Now");
|
|
||||||
throw new Exception("Could not move region Exception");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
|
|
||||||
throw e;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
|
|
||||||
throw e;
|
|
||||||
} catch (CancellationException e) {
|
|
||||||
LOG.error("Thread for moving region cancelled. Timeout for cancellation:"
|
|
||||||
+ timeoutInSeconds + "secs", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void unloadRegions(HBaseAdmin admin, String server, ArrayList<String> regionServers,
|
|
||||||
boolean ack, List<HRegionInfo> movedRegions) throws Exception {
|
|
||||||
List<HRegionInfo> regionsToMove = new ArrayList<HRegionInfo>();
|
|
||||||
regionsToMove = getRegions(this.conf, server);
|
|
||||||
if (regionsToMove.size() == 0) {
|
|
||||||
LOG.info("No Regions to move....Quitting now");
|
|
||||||
return;
|
|
||||||
} else if (regionServers.size() == 0) {
|
|
||||||
LOG.warn("No Regions were moved - no servers available");
|
|
||||||
throw new Exception("No online region servers");
|
|
||||||
}
|
|
||||||
while (true) {
|
|
||||||
regionsToMove = getRegions(this.conf, server);
|
|
||||||
regionsToMove.removeAll(movedRegions);
|
|
||||||
if (regionsToMove.size() == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
int counter = 0;
|
|
||||||
LOG.info("Moving " + regionsToMove.size() + " regions from " + this.hostname + " to "
|
|
||||||
+ regionServers.size() + " servers using " + this.maxthreads + " threads .Ack Mode:"
|
|
||||||
+ ack);
|
|
||||||
ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
|
||||||
List<Future<Boolean>> taskList = new ArrayList<Future<Boolean>>();
|
|
||||||
int serverIndex = 0;
|
|
||||||
while (counter < regionsToMove.size()) {
|
|
||||||
if (ack) {
|
|
||||||
Future<Boolean> task =
|
|
||||||
moveRegionsPool.submit(new MoveWithAck(admin, regionsToMove.get(counter), server,
|
|
||||||
regionServers.get(serverIndex), movedRegions));
|
|
||||||
taskList.add(task);
|
|
||||||
} else {
|
|
||||||
Future<Boolean> task =
|
|
||||||
moveRegionsPool.submit(new MoveWithoutAck(admin, regionsToMove.get(counter), server,
|
|
||||||
regionServers.get(serverIndex), movedRegions));
|
|
||||||
taskList.add(task);
|
|
||||||
}
|
|
||||||
counter++;
|
|
||||||
serverIndex = (serverIndex + 1) % regionServers.size();
|
|
||||||
}
|
|
||||||
moveRegionsPool.shutdown();
|
|
||||||
long timeoutInSeconds =
|
|
||||||
regionsToMove.size()
|
|
||||||
* admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
|
||||||
try {
|
|
||||||
if (!moveRegionsPool.awaitTermination(timeoutInSeconds, TimeUnit.SECONDS)) {
|
|
||||||
moveRegionsPool.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
moveRegionsPool.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
for (Future<Boolean> future : taskList) {
|
|
||||||
try {
|
|
||||||
// if even after shutdownNow threads are stuck we wait for 5 secs max
|
|
||||||
if (!future.get(5, TimeUnit.SECONDS)) {
|
|
||||||
LOG.error("Was Not able to move region....Exiting Now");
|
|
||||||
throw new Exception("Could not move region Exception");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Interrupted while waiting for Thread to Complete " + e.getMessage(), e);
|
|
||||||
throw e;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
LOG.error("Got Exception From Thread While moving region " + e.getMessage(), e);
|
|
||||||
throw e;
|
|
||||||
} catch (CancellationException e) {
|
|
||||||
LOG.error("Thread for moving region cancelled. Timeout for cancellation:"
|
|
||||||
+ timeoutInSeconds + "secs", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Move Regions and make sure that they are up on the target server.If a region movement fails we
|
|
||||||
* exit as failure
|
|
||||||
*/
|
|
||||||
private class MoveWithAck implements Callable<Boolean> {
|
|
||||||
private HBaseAdmin admin;
|
|
||||||
private HRegionInfo region;
|
|
||||||
private String targetServer;
|
|
||||||
private List<HRegionInfo> movedRegions;
|
|
||||||
private String sourceServer;
|
|
||||||
|
|
||||||
public MoveWithAck(HBaseAdmin admin, HRegionInfo regionInfo, String sourceServer,
|
|
||||||
String targetServer, List<HRegionInfo> movedRegions) {
|
|
||||||
this.admin = admin;
|
|
||||||
this.region = regionInfo;
|
|
||||||
this.targetServer = targetServer;
|
|
||||||
this.movedRegions = movedRegions;
|
|
||||||
this.sourceServer = sourceServer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws IOException, InterruptedException {
|
|
||||||
boolean moved = false;
|
|
||||||
int count = 0;
|
|
||||||
int retries = admin.getConfiguration().getInt(MOVE_RETRIES_MAX_KEY, DEFAULT_MOVE_RETRIES_MAX);
|
|
||||||
int maxWaitInSeconds =
|
|
||||||
admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
|
||||||
long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
boolean sameServer = true;
|
|
||||||
// Assert we can scan the region in its current location
|
|
||||||
isSuccessfulScan(admin, region);
|
|
||||||
LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
|
|
||||||
+ targetServer);
|
|
||||||
while (count < retries && sameServer) {
|
|
||||||
if (count > 0) {
|
|
||||||
LOG.info("Retry " + Integer.toString(count) + " of maximum " + Integer.toString(retries));
|
|
||||||
}
|
|
||||||
count = count + 1;
|
|
||||||
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
|
|
||||||
long maxWait = startTime + (maxWaitInSeconds * 1000);
|
|
||||||
while (EnvironmentEdgeManager.currentTimeMillis() < maxWait) {
|
|
||||||
sameServer = isSameServer(admin, region, sourceServer);
|
|
||||||
if (!sameServer) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (sameServer) {
|
|
||||||
LOG.error("Region: " + region.getRegionNameAsString() + " stuck on " + this.sourceServer
|
|
||||||
+ ",newServer=" + this.targetServer);
|
|
||||||
} else {
|
|
||||||
isSuccessfulScan(admin, region);
|
|
||||||
LOG.info("Moved Region "
|
|
||||||
+ region.getRegionNameAsString()
|
|
||||||
+ " cost:"
|
|
||||||
+ String.format("%.3f",
|
|
||||||
(float) (EnvironmentEdgeManager.currentTimeMillis() - startTime) / 1000));
|
|
||||||
moved = true;
|
|
||||||
movedRegions.add(region);
|
|
||||||
}
|
|
||||||
return moved;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
|
|
||||||
* RS down anyways and not abort on a stuck region. Improves movement performance
|
|
||||||
*/
|
|
||||||
private class MoveWithoutAck implements Callable<Boolean> {
|
|
||||||
private HBaseAdmin admin;
|
|
||||||
private HRegionInfo region;
|
|
||||||
private String targetServer;
|
|
||||||
private List<HRegionInfo> movedRegions;
|
|
||||||
private String sourceServer;
|
|
||||||
|
|
||||||
public MoveWithoutAck(HBaseAdmin admin, HRegionInfo regionInfo, String sourceServer,
|
|
||||||
String targetServer, List<HRegionInfo> movedRegions) {
|
|
||||||
this.admin = admin;
|
|
||||||
this.region = regionInfo;
|
|
||||||
this.targetServer = targetServer;
|
|
||||||
this.movedRegions = movedRegions;
|
|
||||||
this.sourceServer = sourceServer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Boolean call() {
|
|
||||||
try {
|
|
||||||
LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
|
|
||||||
+ targetServer);
|
|
||||||
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
|
|
||||||
LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
|
|
||||||
+ targetServer);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error Moving Region:" + region.getEncodedName(), e);
|
|
||||||
} finally {
|
|
||||||
// we add region to the moved regions list in No Ack Mode since this is best effort
|
|
||||||
movedRegions.add(region);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<HRegionInfo> readRegionsFromFile(String filename) throws IOException {
|
|
||||||
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
|
||||||
File f = new File(filename);
|
|
||||||
if (!f.exists()) {
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
FileInputStream fis = null;
|
|
||||||
DataInputStream dis = null;
|
|
||||||
try {
|
|
||||||
fis = new FileInputStream(f);
|
|
||||||
dis = new DataInputStream(fis);
|
|
||||||
int numRegions = dis.readInt();
|
|
||||||
int index = 0;
|
|
||||||
while (index < numRegions) {
|
|
||||||
regions.add(HRegionInfo.parseFromOrNull(Bytes.readByteArray(dis)));
|
|
||||||
index++;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Error while reading regions from file:" + filename, e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (dis != null) {
|
|
||||||
dis.close();
|
|
||||||
}
|
|
||||||
if (fis != null) {
|
|
||||||
fis.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get online regions of the passed server
|
|
||||||
* @param conf
|
|
||||||
* @param server
|
|
||||||
* @return List of Regions online on the server
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private List<HRegionInfo> getRegions(Configuration conf, String server) throws IOException {
|
|
||||||
HConnection conn = HConnectionManager.createConnection(conf);
|
|
||||||
List<HRegionInfo> regions =
|
|
||||||
ProtobufUtil.getOnlineRegions(conn.getAdmin(ServerName.valueOf(server)));
|
|
||||||
conn.close();
|
|
||||||
return regions;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write the number of regions moved in the first line followed by regions moved in subsequent
|
|
||||||
* lines
|
|
||||||
* @param filename
|
|
||||||
* @param movedRegions
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void writeFile(String filename, List<HRegionInfo> movedRegions) throws IOException {
|
|
||||||
FileOutputStream fos = null;
|
|
||||||
DataOutputStream dos = null;
|
|
||||||
try {
|
|
||||||
fos = new FileOutputStream(filename);
|
|
||||||
dos = new DataOutputStream(fos);
|
|
||||||
dos.writeInt(movedRegions.size());
|
|
||||||
for (HRegionInfo region : movedRegions) {
|
|
||||||
Bytes.writeByteArray(dos, region.toByteArray());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("ERROR: Was Not able to write regions moved to output file but moved "
|
|
||||||
+ movedRegions.size() + " regions", e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (dos != null) {
|
|
||||||
dos.close();
|
|
||||||
}
|
|
||||||
if (fos != null) {
|
|
||||||
fos.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Excludes the servername whose hostname and port portion matches the list given in exclude file
|
|
||||||
* @param regionServers
|
|
||||||
* @param excludeFile
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void stripExcludes(ArrayList<String> regionServers, String excludeFile)
|
|
||||||
throws IOException {
|
|
||||||
if (excludeFile != null) {
|
|
||||||
ArrayList<String> excludes = readExcludes(excludeFile);
|
|
||||||
Iterator<String> i = regionServers.iterator();
|
|
||||||
while (i.hasNext()) {
|
|
||||||
String rs = i.next();
|
|
||||||
String rsPort =
|
|
||||||
rs.split(ServerName.SERVERNAME_SEPARATOR)[0] + ":"
|
|
||||||
+ rs.split(ServerName.SERVERNAME_SEPARATOR)[1];
|
|
||||||
if (excludes.contains(rsPort)) {
|
|
||||||
i.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Valid Region server targets are:" + regionServers.toString());
|
|
||||||
LOG.info("Excluded Servers are" + excludes.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an Arraylst of servers listed in exclude file
|
|
||||||
* @param excludeFile
|
|
||||||
* @return ArrayList of servers to be excluded in format <hostname:port>
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private ArrayList<String> readExcludes(String excludeFile) throws IOException {
|
|
||||||
ArrayList<String> excludeServers = new ArrayList<String>();
|
|
||||||
if (excludeFile == null) {
|
|
||||||
return excludeServers;
|
|
||||||
} else {
|
|
||||||
File f = new File(excludeFile);
|
|
||||||
String line;
|
|
||||||
BufferedReader br = null;
|
|
||||||
try {
|
|
||||||
br = new BufferedReader(new FileReader(f));
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
line.trim();
|
|
||||||
if (!line.equals("")) {
|
|
||||||
excludeServers.add(line);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Exception while reading excludes file,continuing anyways", e);
|
|
||||||
} finally {
|
|
||||||
if (br != null) {
|
|
||||||
br.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return excludeServers;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove the servername whose hostname and port portion matches from the passed array of servers.
|
|
||||||
* Returns as side-effect the servername removed.
|
|
||||||
* @param regionServers
|
|
||||||
* @param hostname
|
|
||||||
* @param port
|
|
||||||
* @return server removed from list of Region Servers
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
private String stripServer(ArrayList<String> regionServers, String hostname, int port)
|
|
||||||
throws Exception {
|
|
||||||
String server = null;
|
|
||||||
String portString = Integer.toString(port);
|
|
||||||
Iterator<String> i = regionServers.iterator();
|
|
||||||
int noOfRs = regionServers.size();
|
|
||||||
while (i.hasNext()) {
|
|
||||||
server = i.next();
|
|
||||||
String[] splitServer = server.split(ServerName.SERVERNAME_SEPARATOR);
|
|
||||||
if (splitServer[0].equals(hostname) && splitServer[1].equals(portString)) {
|
|
||||||
i.remove();
|
|
||||||
return server;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (regionServers.size() >= noOfRs) {
|
|
||||||
throw new Exception("Server " + hostname + ":" + Integer.toString(port)
|
|
||||||
+ " is not in list of online servers(Offline/Incorrect)");
|
|
||||||
}
|
|
||||||
return server;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get Arraylist of Servers in the cluster
|
|
||||||
* @param admin
|
|
||||||
* @return ArrayList of online region servers
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private ArrayList<String> getServers(HBaseAdmin admin) throws IOException {
|
|
||||||
ArrayList<ServerName> serverInfo =
|
|
||||||
new ArrayList<ServerName>(admin.getClusterStatus().getServers());
|
|
||||||
ArrayList<String> regionServers = new ArrayList<String>();
|
|
||||||
for (ServerName server : serverInfo) {
|
|
||||||
regionServers.add(server.getServerName());
|
|
||||||
}
|
|
||||||
return regionServers;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(String filename) {
|
|
||||||
File f = new File(filename);
|
|
||||||
if (f.exists()) {
|
|
||||||
f.delete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tries to scan a row from passed region
|
|
||||||
* @param admin
|
|
||||||
* @param region
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void isSuccessfulScan(HBaseAdmin admin, HRegionInfo region) throws IOException {
|
|
||||||
HTable table = null;
|
|
||||||
ResultScanner scanner = null;
|
|
||||||
Scan scan = new Scan(region.getStartKey());
|
|
||||||
scan.setBatch(1);
|
|
||||||
scan.setCaching(1);
|
|
||||||
scan.setFilter(new FirstKeyOnlyFilter());
|
|
||||||
try {
|
|
||||||
table = new HTable(admin.getConfiguration(), region.getTable());
|
|
||||||
scanner = table.getScanner(scan);
|
|
||||||
Result result = scanner.next();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Could not scan region:" + region.getEncodedName(), e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (scanner != null) {
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
if (table != null) {
|
|
||||||
table.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true if passed region is still on serverName when we look at hbase:meta.
|
|
||||||
* @param admin
|
|
||||||
* @param region
|
|
||||||
* @param serverName
|
|
||||||
* @return true if region is hosted on serverName otherwise false
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private boolean isSameServer(HBaseAdmin admin, HRegionInfo region, String serverName)
|
|
||||||
throws IOException {
|
|
||||||
String serverForRegion = getServerNameForRegion(admin, region);
|
|
||||||
if (serverForRegion != null && serverForRegion.equals(serverName)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get servername that is up in hbase:meta hosting the given region. this is hostname + port +
|
|
||||||
* startcode comma-delimited. Can return null
|
|
||||||
* @param admin
|
|
||||||
* @param region
|
|
||||||
* @return regionServer hosting the given region
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private String getServerNameForRegion(HBaseAdmin admin, HRegionInfo region) throws IOException {
|
|
||||||
String server = null;
|
|
||||||
if (!admin.isTableEnabled(region.getTable())) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (region.isMetaRegion()) {
|
|
||||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(admin.getConfiguration(), "region_mover", null);
|
|
||||||
MetaRegionTracker tracker = new MetaRegionTracker(zkw, new RegionMoverAbortable());
|
|
||||||
int maxWaitInSeconds =
|
|
||||||
admin.getConfiguration().getInt(MOVE_WAIT_MAX_KEY, DEFAULT_MOVE_WAIT_MAX);
|
|
||||||
tracker.start();
|
|
||||||
try {
|
|
||||||
server = tracker.waitMetaRegionLocation(maxWaitInSeconds * 1000).toString() + ",";
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Interrupted while waiting for location of Meta", e);
|
|
||||||
} finally {
|
|
||||||
tracker.stop();
|
|
||||||
if (zkw != null) {
|
|
||||||
zkw.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
HTable table = null;
|
|
||||||
Result result = null;
|
|
||||||
try {
|
|
||||||
table = new HTable(admin.getConfiguration(), TableName.META_TABLE_NAME);
|
|
||||||
Get get = new Get(region.getRegionName());
|
|
||||||
get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
|
||||||
get.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
|
|
||||||
result = table.get(get);
|
|
||||||
if (result != null) {
|
|
||||||
byte[] servername =
|
|
||||||
result.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
|
||||||
byte[] startcode =
|
|
||||||
result.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
|
|
||||||
if (servername != null) {
|
|
||||||
server =
|
|
||||||
Bytes.toString(servername).replaceFirst(":", ",") + "," + Bytes.toLong(startcode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Could not get Server Name for region:" + region.getEncodedName(), e);
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
if (table != null) {
|
|
||||||
table.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return server;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class RegionMoverAbortable implements Abortable {
|
|
||||||
boolean isAborted = false;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
isAborted = true;
|
|
||||||
LOG.error("ABORTING Region_mover:" + why, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return isAborted;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void addOptions() {
|
|
||||||
this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
|
|
||||||
this.addRequiredOptWithArg("l", "Expected: load/unload");
|
|
||||||
this.addOptWithArg("m", "maxthreads",
|
|
||||||
"Define the maximum number of threads to use to unload and reload the regions");
|
|
||||||
this.addOptWithArg("x", "excludefile",
|
|
||||||
"File with <hostname:port> per line to exclude as unload targets; default excludes only "
|
|
||||||
+ "target host; useful for rack decommisioning.");
|
|
||||||
this.addOptWithArg("f", "filename",
|
|
||||||
"File to save regions list into unloading, or read from loading; "
|
|
||||||
+ "default /tmp/<usernamehostname:port>");
|
|
||||||
this.addOptNoArg("n", "noAck",
|
|
||||||
"Enable Ack mode(default: true) which checks if region is online on target RegionServer -- "
|
|
||||||
+ "Upon disabling,in case a region is stuck, it'll move on anyways");
|
|
||||||
this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
|
|
||||||
+ "irrespective of whether it finished or not;default Integer.MAX_VALUE");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void processOptions(CommandLine cmd) {
|
|
||||||
String hostname = cmd.getOptionValue("r");
|
|
||||||
rmbuilder = new RegionMoverBuilder(hostname);
|
|
||||||
if (cmd.hasOption('m')) {
|
|
||||||
rmbuilder.maxthreads(Integer.parseInt(cmd.getOptionValue('m')));
|
|
||||||
}
|
|
||||||
if (cmd.hasOption('n')) {
|
|
||||||
rmbuilder.ack(false);
|
|
||||||
}
|
|
||||||
if (cmd.hasOption('f')) {
|
|
||||||
rmbuilder.filename(cmd.getOptionValue('f'));
|
|
||||||
}
|
|
||||||
if (cmd.hasOption('x')) {
|
|
||||||
rmbuilder.excludeFile(cmd.getOptionValue('x'));
|
|
||||||
}
|
|
||||||
if (cmd.hasOption('t')) {
|
|
||||||
rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
|
|
||||||
}
|
|
||||||
this.loadUnload = cmd.getOptionValue("l").toLowerCase();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int doWork() throws Exception {
|
|
||||||
RegionMover rm = rmbuilder.build();
|
|
||||||
if (loadUnload.equalsIgnoreCase("load")) {
|
|
||||||
rm.load();
|
|
||||||
} else if (loadUnload.equalsIgnoreCase("unload")) {
|
|
||||||
rm.unload();
|
|
||||||
} else {
|
|
||||||
printUsage();
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
new RegionMover().doStaticMain(args);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,176 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
|
||||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
|
||||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
|
||||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
|
||||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
|
||||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
|
||||||
* for the specific language governing permissions and limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.util;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
import java.io.FileWriter;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.RegionMover.RegionMoverBuilder;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests for Region Mover Load/Unload functionality with and without ack mode and also to test
|
|
||||||
* exclude functionality useful for rack decommissioning
|
|
||||||
*/
|
|
||||||
@Category(MediumTests.class)
|
|
||||||
public class TestRegionMover {
|
|
||||||
|
|
||||||
final Log LOG = LogFactory.getLog(getClass());
|
|
||||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
TEST_UTIL.startMiniCluster(3);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
// Create a pre-split table just to populate some regions
|
|
||||||
TableName tableName = TableName.valueOf("testRegionMover");
|
|
||||||
if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
|
|
||||||
TEST_UTIL.deleteTable(tableName);
|
|
||||||
}
|
|
||||||
HBaseAdmin admin = null;
|
|
||||||
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
|
|
||||||
try {
|
|
||||||
admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
|
|
||||||
admin.setBalancerRunning(false, true);
|
|
||||||
String startKey = "a";
|
|
||||||
String endKey = "z";
|
|
||||||
admin.createTable(tableDesc, startKey.getBytes(), endKey.getBytes(), 9);
|
|
||||||
} finally {
|
|
||||||
if (admin != null) {
|
|
||||||
admin.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testLoadWithAck() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
|
||||||
String rsName = regionServer.getServerName().getHostname();
|
|
||||||
int port = regionServer.getServerName().getPort();
|
|
||||||
int noRegions = regionServer.getNumberOfOnlineRegions();
|
|
||||||
String rs = rsName + ":" + Integer.toString(port);
|
|
||||||
RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true).maxthreads(8);
|
|
||||||
RegionMover rm = rmBuilder.build();
|
|
||||||
rm.setConf(TEST_UTIL.getConfiguration());
|
|
||||||
LOG.info("Unloading " + rs);
|
|
||||||
rm.unload();
|
|
||||||
assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
|
||||||
LOG.info("Successfully Unloaded\nNow Loading");
|
|
||||||
rm.load();
|
|
||||||
assertEquals(noRegions, regionServer.getNumberOfOnlineRegions());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testLoadWithoutAck() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
|
||||||
String rsName = regionServer.getServerName().getHostname();
|
|
||||||
int port = regionServer.getServerName().getPort();
|
|
||||||
int noRegions = regionServer.getNumberOfOnlineRegions();
|
|
||||||
String rs = rsName + ":" + Integer.toString(port);
|
|
||||||
RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false);
|
|
||||||
RegionMover rm = rmBuilder.build();
|
|
||||||
rm.setConf(TEST_UTIL.getConfiguration());
|
|
||||||
LOG.info("Unloading " + rs);
|
|
||||||
rm.unload();
|
|
||||||
assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
|
||||||
LOG.info("Successfully Unloaded\nNow Loading");
|
|
||||||
rm.load();
|
|
||||||
Thread.sleep(100);
|
|
||||||
assertEquals(noRegions, regionServer.getNumberOfOnlineRegions());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUnloadWithoutAck() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
|
||||||
String rsName = regionServer.getServerName().getHostname();
|
|
||||||
int port = regionServer.getServerName().getPort();
|
|
||||||
String rs = rsName + ":" + Integer.toString(port);
|
|
||||||
RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(false);
|
|
||||||
RegionMover rm = rmBuilder.build();
|
|
||||||
rm.setConf(TEST_UTIL.getConfiguration());
|
|
||||||
LOG.info("Unloading " + rs);
|
|
||||||
rm.unload();
|
|
||||||
assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUnloadWithAck() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
|
||||||
String rsName = regionServer.getServerName().getHostname();
|
|
||||||
int port = regionServer.getServerName().getPort();
|
|
||||||
String rs = rsName + ":" + Integer.toString(port);
|
|
||||||
RegionMoverBuilder rmBuilder = new RegionMoverBuilder(rs).ack(true);
|
|
||||||
RegionMover rm = rmBuilder.build();
|
|
||||||
rm.setConf(TEST_UTIL.getConfiguration());
|
|
||||||
rm.unload();
|
|
||||||
LOG.info("Unloading " + rs);
|
|
||||||
assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* To test that we successfully exclude a server from the unloading process We test for the number
|
|
||||||
* of regions on Excluded server and also test that regions are unloaded successfully
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testExclude() throws Exception {
|
|
||||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
|
||||||
FileWriter fos = new FileWriter("/tmp/exclude_file");
|
|
||||||
HRegionServer excludeServer = cluster.getRegionServer(1);
|
|
||||||
String excludeHostname = excludeServer.getServerName().getHostname();
|
|
||||||
int excludeServerPort = excludeServer.getServerName().getPort();
|
|
||||||
int regionsExcludeServer = excludeServer.getNumberOfOnlineRegions();
|
|
||||||
String excludeServerName = excludeHostname + ":" + Integer.toString(excludeServerPort);
|
|
||||||
fos.write(excludeServerName);
|
|
||||||
fos.close();
|
|
||||||
HRegionServer regionServer = cluster.getRegionServer(0);
|
|
||||||
String rsName = regionServer.getServerName().getHostname();
|
|
||||||
int port = regionServer.getServerName().getPort();
|
|
||||||
String rs = rsName + ":" + Integer.toString(port);
|
|
||||||
RegionMoverBuilder rmBuilder =
|
|
||||||
new RegionMoverBuilder(rs).ack(true).excludeFile("/tmp/exclude_file");
|
|
||||||
RegionMover rm = rmBuilder.build();
|
|
||||||
rm.setConf(TEST_UTIL.getConfiguration());
|
|
||||||
rm.unload();
|
|
||||||
LOG.info("Unloading " + rs);
|
|
||||||
assertEquals(0, regionServer.getNumberOfOnlineRegions());
|
|
||||||
assertEquals(regionsExcludeServer, cluster.getRegionServer(1).getNumberOfOnlineRegions());
|
|
||||||
LOG.info("Before:" + regionsExcludeServer + " After:"
|
|
||||||
+ cluster.getRegionServer(1).getNumberOfOnlineRegions());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue