HBASE-14749 Make changes to region_mover.rb to use RegionMover Java tool (Abhishek Singh Chouhan)
This commit is contained in:
parent
cf8d3bd641
commit
91945d7f49
|
@ -21,13 +21,17 @@
|
|||
# Move regions off a server then stop it. Optionally restart and reload.
|
||||
# Turn off the balancer before running this script.
|
||||
function usage {
|
||||
echo "Usage: graceful_stop.sh [--config <conf-dir>] [-d] [-e] [--restart [--reload]] [--thrift] [--rest] <hostname>"
|
||||
echo "Usage: graceful_stop.sh [--config <conf-dir>] [-e] [--restart [--reload]] [--thrift] \
|
||||
[--rest] <hostname>"
|
||||
echo " thrift If we should stop/start thrift before/after the hbase stop/start"
|
||||
echo " rest If we should stop/start rest before/after the hbase stop/start"
|
||||
echo " restart If we should restart after graceful stop"
|
||||
echo " reload Move offloaded regions back on to the restarted server"
|
||||
echo " d|debug Print helpful debug information"
|
||||
echo " n|noack Enable noAck mode in RegionMover. This is a best effort mode for \
|
||||
moving regions"
|
||||
echo " maxthreads xx Limit the number of threads used by the region mover. Default value is 1."
|
||||
echo " movetimeout xx Timeout for moving regions. If regions are not moved by the timeout value,\
|
||||
exit with error. Default value is INT_MAX."
|
||||
echo " hostname Hostname of server we are to stop"
|
||||
echo " e|failfast Set -e so exit immediately if any command exits with non-zero status"
|
||||
exit 1
|
||||
|
@ -44,9 +48,10 @@ bin=`cd "$bin">/dev/null; pwd`
|
|||
# Get arguments
|
||||
restart=
|
||||
reload=
|
||||
debug=
|
||||
noack=
|
||||
thrift=
|
||||
rest=
|
||||
movetimeout=2147483647
|
||||
maxthreads=1
|
||||
failfast=
|
||||
while [ $# -gt 0 ]
|
||||
|
@ -57,8 +62,9 @@ do
|
|||
--restart) restart=true; shift;;
|
||||
--reload) reload=true; shift;;
|
||||
--failfast | -e) failfast=true; shift;;
|
||||
--debug | -d) debug="--debug"; shift;;
|
||||
--noack | -n) noack="--noack"; shift;;
|
||||
--maxthreads) shift; maxthreads=$1; shift;;
|
||||
--movetimeout) shift; movetimeout=$1; shift;;
|
||||
--) shift; break;;
|
||||
-*) usage ;;
|
||||
*) break;; # terminate while loop
|
||||
|
@ -96,7 +102,9 @@ HBASE_BALANCER_STATE=`echo 'balance_switch false' | "$bin"/hbase --config ${HBAS
|
|||
log "Previous balancer state was $HBASE_BALANCER_STATE"
|
||||
|
||||
log "Unloading $hostname region(s)"
|
||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads unload $hostname
|
||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.apache.hadoop.hbase.util.RegionMover \
|
||||
--filename $filename --maxthreads $maxthreads $noack --operation "unload" --timeout $movetimeout \
|
||||
--regionserverhost $hostname
|
||||
log "Unloaded $hostname region(s)"
|
||||
|
||||
# Stop the server(s). Have to put hostname into its own little file for hbase-daemons.sh
|
||||
|
@ -150,7 +158,9 @@ if [ "$restart" != "" ]; then
|
|||
fi
|
||||
if [ "$reload" != "" ]; then
|
||||
log "Reloading $hostname region(s)"
|
||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads load $hostname
|
||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} \
|
||||
org.apache.hadoop.hbase.util.RegionMover --filename $filename --maxthreads $maxthreads $noack \
|
||||
--operation "load" --timeout $movetimeout --regionserverhost $hostname
|
||||
log "Reloaded $hostname region(s)"
|
||||
fi
|
||||
fi
|
||||
|
|
|
@ -20,502 +20,5 @@
|
|||
# not move a new region until successful confirm of region loading in new
|
||||
# location. Presumes balancer is disabled when we run (not harmful if its
|
||||
# on but this script and balancer will end up fighting each other).
|
||||
require 'optparse'
|
||||
require File.join(File.dirname(__FILE__), 'thread-pool')
|
||||
include Java
|
||||
import org.apache.hadoop.hbase.HConstants
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin
|
||||
import org.apache.hadoop.hbase.TableName
|
||||
import org.apache.hadoop.hbase.client.Get
|
||||
import org.apache.hadoop.hbase.client.Scan
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.util.Bytes
|
||||
import org.apache.hadoop.hbase.util.Writables
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.commons.logging.Log
|
||||
import org.apache.commons.logging.LogFactory
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
|
||||
import org.apache.hadoop.hbase.ServerName
|
||||
import org.apache.hadoop.hbase.HRegionInfo
|
||||
|
||||
# Name of this script
|
||||
NAME = "region_mover"
|
||||
# Get configuration instance
|
||||
def getConfiguration()
|
||||
config = HBaseConfiguration.create()
|
||||
# No prefetching on hbase:meta This is for versions pre 0.99. Newer versions do not prefetch.
|
||||
config.setInt("hbase.client.prefetch.limit", 1)
|
||||
# Make a config that retries at short intervals many times
|
||||
config.setInt("hbase.client.pause", 500)
|
||||
config.setInt("hbase.client.retries.number", 100)
|
||||
return config
|
||||
end
|
||||
|
||||
$connection=ConnectionFactory.createConnection(getConfiguration())
|
||||
|
||||
# Returns true if passed region is still on 'original' when we look at hbase:meta.
|
||||
def isSameServer(admin, r, original)
|
||||
server = getServerNameForRegion(admin, r)
|
||||
return false unless server and original
|
||||
return server == original
|
||||
end
|
||||
|
||||
class RubyAbortable
|
||||
include org.apache.hadoop.hbase.Abortable
|
||||
def abort(why, e)
|
||||
puts "ABORTED! why=" + why + ", e=" + e.to_s
|
||||
end
|
||||
end
|
||||
|
||||
# Get servername that is up in hbase:meta; this is hostname + port + startcode comma-delimited.
|
||||
# Can return nil
|
||||
def getServerNameForRegion(admin, r)
|
||||
return nil unless admin.isTableEnabled(r.getTable())
|
||||
if r.isMetaRegion()
|
||||
# Hack
|
||||
zkw = org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.new(admin.getConfiguration(), "region_mover", nil)
|
||||
mtl = org.apache.hadoop.hbase.zookeeper.MetaTableLocator.new()
|
||||
begin
|
||||
while not mtl.isLocationAvailable(zkw)
|
||||
sleep 0.1
|
||||
end
|
||||
# Make a fake servername by appending ','
|
||||
metaServer = mtl.getMetaRegionLocation(zkw).toString() + ","
|
||||
return metaServer
|
||||
ensure
|
||||
zkw.close()
|
||||
end
|
||||
end
|
||||
table = $connection.getTable(TableName.valueOf('hbase:meta'))
|
||||
begin
|
||||
g = Get.new(r.getRegionName())
|
||||
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
||||
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
||||
result = table.get(g)
|
||||
return nil unless result
|
||||
server = result.getValue(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
||||
startcode = result.getValue(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
||||
return nil unless server
|
||||
return java.lang.String.new(Bytes.toString(server)).replaceFirst(":", ",") + "," + Bytes.toLong(startcode).to_s
|
||||
ensure
|
||||
table.close()
|
||||
end
|
||||
end
|
||||
|
||||
# Trys to scan a row from passed region
|
||||
# Throws exception if can't
|
||||
def isSuccessfulScan(admin, r)
|
||||
scan = Scan.new(r.getStartKey(), r.getStartKey())
|
||||
scan.setBatch(1)
|
||||
scan.setCaching(1)
|
||||
scan.setFilter(FilterList.new(FirstKeyOnlyFilter.new(),InclusiveStopFilter.new(r.getStartKey())))
|
||||
begin
|
||||
table = $connection.getTable(r.getTable())
|
||||
scanner = table.getScanner(scan)
|
||||
begin
|
||||
results = scanner.next()
|
||||
# We might scan into next region, this might be an empty table.
|
||||
# But if no exception, presume scanning is working.
|
||||
rescue java.lang.NullPointerException => e
|
||||
$LOG.warn("Unable to scan region=" + r.getRegionNameAsString() +
|
||||
" start key is empty. " + e.message)
|
||||
ensure
|
||||
scanner.close()
|
||||
end
|
||||
rescue org.apache.hadoop.hbase.TableNotFoundException,
|
||||
org.apache.hadoop.hbase.TableNotEnabledException => e
|
||||
$LOG.warn("Region " + r.getEncodedName() + " belongs to recently " +
|
||||
"deleted/disabled table. Skipping... " + e.message)
|
||||
return
|
||||
ensure
|
||||
table.close() unless table.nil?
|
||||
end
|
||||
end
|
||||
|
||||
# Check region has moved successful and is indeed hosted on another server
|
||||
# Wait until that is the case.
|
||||
def move(admin, r, newServer, original)
|
||||
# Now move it. Do it in a loop so can retry if fail. Have seen issue where
|
||||
# we tried move region but failed and retry put it back on old location;
|
||||
# retry in this case.
|
||||
|
||||
retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5)
|
||||
count = 0
|
||||
same = true
|
||||
start = Time.now
|
||||
while count < retries and same
|
||||
if count > 0
|
||||
$LOG.info("Retry " + count.to_s + " of maximum " + retries.to_s)
|
||||
end
|
||||
count = count + 1
|
||||
begin
|
||||
admin.move(Bytes.toBytes(r.getEncodedName()), Bytes.toBytes(newServer))
|
||||
rescue java.lang.reflect.UndeclaredThrowableException,
|
||||
org.apache.hadoop.hbase.UnknownRegionException => e
|
||||
$LOG.info("Exception moving " + r.getEncodedName() +
|
||||
"; split/moved? Continuing: " + e)
|
||||
return
|
||||
end
|
||||
# Wait till its up on new server before moving on
|
||||
maxWaitInSeconds = admin.getConfiguration.getInt("hbase.move.wait.max", 60)
|
||||
maxWait = Time.now + maxWaitInSeconds
|
||||
while Time.now < maxWait
|
||||
same = isSameServer(admin, r, original)
|
||||
break unless same
|
||||
sleep 0.1
|
||||
end
|
||||
end
|
||||
raise RuntimeError, "Region stuck on #{original}, newserver=#{newServer}" if same
|
||||
# Assert can Scan from new location.
|
||||
isSuccessfulScan(admin, r)
|
||||
$LOG.info("Moved region " + r.getRegionNameAsString() + " cost: " +
|
||||
java.lang.String.format("%.3f", (Time.now - start)))
|
||||
end
|
||||
|
||||
# Return the hostname:port out of a servername (all up to second ',')
|
||||
def getHostPortFromServerName(serverName)
|
||||
return serverName.split(',')[0..1]
|
||||
end
|
||||
|
||||
# Return array of servernames where servername is hostname+port+startcode
|
||||
# comma-delimited
|
||||
def getServers(admin)
|
||||
serverInfos = admin.getClusterStatus().getServers()
|
||||
servers = []
|
||||
for server in serverInfos
|
||||
servers << server.getServerName()
|
||||
end
|
||||
return servers
|
||||
end
|
||||
|
||||
# Get master hostname
|
||||
def getMaster(admin)
|
||||
return admin.getClusterStatus().getMaster().getHostname(),
|
||||
admin.getClusterStatus().getMaster().getPort()
|
||||
end
|
||||
|
||||
# Remove the servername whose hostname portion matches from the passed
|
||||
# array of servers. Returns as side-effect the servername removed.
|
||||
def stripServer(servers, hostname, port)
|
||||
upperCaseHostname = hostname.upcase;
|
||||
count = servers.length
|
||||
servername = nil
|
||||
for server in servers
|
||||
hostFromServerName, portFromServerName = getHostPortFromServerName(server)
|
||||
hostFromServerName = hostFromServerName.upcase
|
||||
if hostFromServerName == upperCaseHostname and portFromServerName == port
|
||||
servername = servers.delete(server)
|
||||
end
|
||||
end
|
||||
# Check server to exclude is actually present
|
||||
raise RuntimeError, "Server %s:%d not online" % [hostname, port] unless servers.length < count
|
||||
return servername
|
||||
end
|
||||
|
||||
# Removes master from servers list
|
||||
def stripMaster(servers, masterHostname, masterPort)
|
||||
for server in servers
|
||||
hostFromServerName, portFromServerName = getHostPortFromServerName(server)
|
||||
if hostFromServerName == masterHostname and portFromServerName == masterPort.to_s
|
||||
servers.delete(server)
|
||||
end
|
||||
end
|
||||
return servers
|
||||
end
|
||||
|
||||
|
||||
# Returns a new serverlist that excludes the servername whose hostname portion
|
||||
# matches from the passed array of servers.
|
||||
def stripExcludes(servers, excludefile)
|
||||
excludes = readExcludes(excludefile)
|
||||
updatedservers = []
|
||||
servers.each_with_index do |val,indx|
|
||||
if !excludes.to_a.include? val.split(",")[0].to_s
|
||||
updatedservers << val
|
||||
end
|
||||
end
|
||||
servers = updatedservers
|
||||
# return updated servers list
|
||||
return servers
|
||||
end
|
||||
|
||||
|
||||
# Return servername that matches passed hostname and port
|
||||
def getServerName(servers, hostname, port)
|
||||
servername = nil
|
||||
upperCaseHostname = hostname.upcase;
|
||||
for server in servers
|
||||
hostFromServerName, portFromServerName = getHostPortFromServerName(server)
|
||||
hostFromServerName = hostFromServerName.upcase
|
||||
if hostFromServerName == upperCaseHostname and portFromServerName == port
|
||||
servername = server
|
||||
break
|
||||
end
|
||||
end
|
||||
raise ArgumentError, "Server %s:%d not online" % [hostname, port] unless servername
|
||||
return servername
|
||||
end
|
||||
|
||||
# Create a logger and disable the DEBUG-level annoying client logging
|
||||
def configureLogging(options)
|
||||
apacheLogger = LogFactory.getLog(NAME)
|
||||
# Configure log4j to not spew so much
|
||||
unless (options[:debug])
|
||||
logger = org.apache.log4j.Logger.getLogger("org.apache.hadoop.hbase.client")
|
||||
logger.setLevel(org.apache.log4j.Level::INFO)
|
||||
end
|
||||
return apacheLogger
|
||||
end
|
||||
|
||||
# Now get list of regions on targetServer
|
||||
def getRegions(config, servername)
|
||||
connection = ConnectionFactory::createConnection(config);
|
||||
return ProtobufUtil::getOnlineRegions(connection.getAdmin(ServerName.valueOf(servername)));
|
||||
end
|
||||
|
||||
def deleteFile(filename)
|
||||
f = java.io.File.new(filename)
|
||||
f.delete() if f.exists()
|
||||
end
|
||||
|
||||
# Write HRegionInfo to file
|
||||
# Need to serialize in case non-printable characters.
|
||||
# Format is count of regionnames followed by serialized regionnames.
|
||||
def writeFile(filename, regions)
|
||||
fos = java.io.FileOutputStream.new(filename)
|
||||
dos = java.io.DataOutputStream.new(fos)
|
||||
# Write out a count of region names
|
||||
dos.writeInt(regions.size())
|
||||
# Write actual region names.
|
||||
for r in regions
|
||||
Bytes.writeByteArray(dos, r.toByteArray())
|
||||
end
|
||||
dos.close()
|
||||
end
|
||||
|
||||
# See writeFile above.
|
||||
# Returns array of HRegionInfos
|
||||
def readFile(filename)
|
||||
f = java.io.File.new(filename)
|
||||
return java.util.ArrayList.new() unless f.exists()
|
||||
fis = java.io.FileInputStream.new(f)
|
||||
dis = java.io.DataInputStream.new(fis)
|
||||
# Read count of regions
|
||||
count = dis.readInt()
|
||||
regions = java.util.ArrayList.new(count)
|
||||
index = 0
|
||||
while index < count
|
||||
regions.add(HRegionInfo.parseFromOrNull(Bytes.readByteArray(dis)))
|
||||
index = index + 1
|
||||
end
|
||||
dis.close()
|
||||
return regions
|
||||
end
|
||||
|
||||
# Move regions off the passed hostname:port
|
||||
def unloadRegions(options, hostname, port)
|
||||
# Clean up any old files.
|
||||
filename = getFilename(options, hostname, port)
|
||||
deleteFile(filename)
|
||||
# Get configuration
|
||||
config = getConfiguration()
|
||||
# Get an admin instance
|
||||
admin = HBaseAdmin.new(config)
|
||||
servers = getServers(admin)
|
||||
master, masterPort = getMaster(admin)
|
||||
# Remove the server we are unloading from from list of servers.
|
||||
# Side-effect is the servername that matches this hostname
|
||||
servername = stripServer(servers, hostname, port)
|
||||
# Remove the servers in our exclude list from list of servers.
|
||||
servers = stripExcludes(servers, options[:excludesFile])
|
||||
servers = stripMaster(servers, master, masterPort)
|
||||
puts "Valid region move targets: ", servers
|
||||
if servers.length == 0
|
||||
puts "No regions were moved - there was no server available"
|
||||
exit 4
|
||||
end
|
||||
movedRegions = java.util.Collections.synchronizedList(java.util.ArrayList.new())
|
||||
while true
|
||||
rs = getRegions(config, servername)
|
||||
# Remove those already tried to move
|
||||
rs.removeAll(movedRegions)
|
||||
break if rs.length == 0
|
||||
$LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername +
|
||||
" on " + servers.length.to_s + " servers using " + options[:maxthreads].to_s + " threads.")
|
||||
counter = 0
|
||||
pool = ThreadPool.new(options[:maxthreads])
|
||||
server_index = 0
|
||||
while counter < rs.length do
|
||||
pool.launch(rs,counter,server_index) do |_rs,_counter,_server_index|
|
||||
$LOG.info("Moving region " + _rs[_counter].getEncodedName() + " (" + (_counter + 1).to_s +
|
||||
" of " + _rs.length.to_s + ") to server=" + servers[_server_index] + " for " + servername)
|
||||
# Assert we can scan region in its current location
|
||||
isSuccessfulScan(admin, _rs[_counter])
|
||||
# Now move it.
|
||||
move(admin, _rs[_counter], servers[_server_index], servername)
|
||||
movedRegions.add(_rs[_counter])
|
||||
end
|
||||
counter += 1
|
||||
server_index = (server_index + 1) % servers.length
|
||||
end
|
||||
$LOG.info("Waiting for the pool to complete")
|
||||
pool.stop
|
||||
$LOG.info("Pool completed")
|
||||
end
|
||||
if movedRegions.size() > 0
|
||||
# Write out file of regions moved
|
||||
writeFile(filename, movedRegions)
|
||||
$LOG.info("Wrote list of moved regions to " + filename)
|
||||
end
|
||||
end
|
||||
|
||||
# Move regions to the passed hostname
|
||||
def loadRegions(options, hostname, port)
|
||||
# Get configuration
|
||||
config = getConfiguration()
|
||||
# Get an admin instance
|
||||
admin = HBaseAdmin.new(config)
|
||||
filename = getFilename(options, hostname, port)
|
||||
regions = readFile(filename)
|
||||
return if regions.isEmpty()
|
||||
servername = nil
|
||||
# Wait till server is up
|
||||
maxWaitInSeconds = admin.getConfiguration.getInt("hbase.serverstart.wait.max", 180)
|
||||
maxWait = Time.now + maxWaitInSeconds
|
||||
while Time.now < maxWait
|
||||
servers = getServers(admin)
|
||||
begin
|
||||
servername = getServerName(servers, hostname, port)
|
||||
rescue ArgumentError => e
|
||||
$LOG.info("hostname=" + hostname.to_s + ":" + port.to_s + " is not up yet, waiting");
|
||||
end
|
||||
break if servername
|
||||
sleep 0.5
|
||||
end
|
||||
$LOG.info("Moving " + regions.size().to_s + " regions to " + servername)
|
||||
# sleep 20s to make sure the rs finished initialization.
|
||||
sleep 20
|
||||
counter = 0
|
||||
pool = ThreadPool.new(options[:maxthreads])
|
||||
while counter < regions.length do
|
||||
r = regions[counter]
|
||||
exists = false
|
||||
begin
|
||||
isSuccessfulScan(admin, r)
|
||||
exists = true
|
||||
rescue org.apache.hadoop.hbase.NotServingRegionException => e
|
||||
$LOG.info("Failed scan of " + e.message)
|
||||
end
|
||||
next unless exists
|
||||
currentServer = getServerNameForRegion(admin, r)
|
||||
if currentServer and currentServer == servername
|
||||
$LOG.info("Region " + r.getRegionNameAsString() + " (" + counter.to_s +
|
||||
" of " + regions.length.to_s + ") already on target server=" + servername)
|
||||
counter = counter + 1
|
||||
next
|
||||
end
|
||||
pool.launch(r,currentServer,counter) do |_r,_currentServer,_counter|
|
||||
$LOG.info("Moving region " + _r.getRegionNameAsString() + " (" + (_counter + 1).to_s +
|
||||
" of " + regions.length.to_s + ") from " + _currentServer.to_s + " to server=" +
|
||||
servername);
|
||||
move(admin, _r, servername, _currentServer)
|
||||
end
|
||||
counter = counter + 1
|
||||
end
|
||||
pool.stop
|
||||
end
|
||||
|
||||
# Returns an array of hosts to exclude as region move targets
|
||||
def readExcludes(filename)
|
||||
if filename == nil
|
||||
return java.util.ArrayList.new()
|
||||
end
|
||||
if ! File.exist?(filename)
|
||||
puts "Error: Unable to read host exclude file: ", filename
|
||||
raise RuntimeError
|
||||
end
|
||||
|
||||
f = File.new(filename, "r")
|
||||
# Read excluded hosts list
|
||||
excludes = java.util.ArrayList.new()
|
||||
while (line = f.gets)
|
||||
line.strip! # do an inplace drop of pre and post whitespaces
|
||||
excludes.add(line) unless line.empty? # exclude empty lines
|
||||
end
|
||||
puts "Excluding hosts as region move targets: ", excludes
|
||||
f.close
|
||||
|
||||
return excludes
|
||||
end
|
||||
|
||||
def getFilename(options, targetServer, port)
|
||||
filename = options[:file]
|
||||
if not filename
|
||||
filename = "/tmp/" + ENV['USER'] + targetServer + ":" + port
|
||||
end
|
||||
return filename
|
||||
end
|
||||
|
||||
|
||||
# Do command-line parsing
|
||||
options = {}
|
||||
optparse = OptionParser.new do |opts|
|
||||
opts.banner = "Usage: #{NAME}.rb [options] load|unload [<hostname>|<hostname:port>]"
|
||||
opts.separator 'Load or unload regions by moving one at a time'
|
||||
options[:file] = nil
|
||||
options[:maxthreads] = 1
|
||||
opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /tmp/<hostname:port>') do |file|
|
||||
options[:file] = file
|
||||
end
|
||||
opts.on('-h', '--help', 'Display usage information') do
|
||||
puts opts
|
||||
exit
|
||||
end
|
||||
options[:debug] = false
|
||||
opts.on('-d', '--debug', 'Display extra debug logging') do
|
||||
options[:debug] = true
|
||||
end
|
||||
opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file|
|
||||
options[:excludesFile] = file
|
||||
end
|
||||
opts.on('-m', '--maxthreads=XX', 'Define the maximum number of threads to use to unload and reload the regions') do |number|
|
||||
options[:maxthreads] = number.to_i
|
||||
end
|
||||
end
|
||||
optparse.parse!
|
||||
|
||||
# Check ARGVs
|
||||
if ARGV.length < 2
|
||||
puts optparse
|
||||
exit 1
|
||||
end
|
||||
hostname, port = ARGV[1].split(":")
|
||||
if not hostname
|
||||
opts optparse
|
||||
exit 2
|
||||
end
|
||||
|
||||
# Get configuration
|
||||
config = getConfiguration()
|
||||
if not port
|
||||
port = config.getInt(HConstants::REGIONSERVER_PORT, HConstants::DEFAULT_REGIONSERVER_PORT)
|
||||
end
|
||||
port = port.to_s
|
||||
|
||||
# Create a logger and save it to ruby global
|
||||
$LOG = configureLogging(options)
|
||||
case ARGV[0]
|
||||
when 'load'
|
||||
loadRegions(options, hostname, port)
|
||||
when 'unload'
|
||||
unloadRegions(options, hostname, port)
|
||||
else
|
||||
puts optparse
|
||||
exit 3
|
||||
end
|
||||
|
||||
$connection.close()
|
||||
$BIN=File.dirname(__FILE__)
|
||||
exec "#{$BIN}/hbase org.apache.hadoop.hbase.util.RegionMover #{ARGV.join(' ')}"
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
#
|
||||
# Modelled after $HADOOP_HOME/bin/slaves.sh.
|
||||
|
||||
usage_str="Usage: `basename $0` [--config <hbase-confdir>] [--rs-only] [--master-only] [--graceful] [--maxthreads xx]"
|
||||
usage_str="Usage: `basename $0` [--config <hbase-confdir>] [--rs-only] [--master-only]\
|
||||
[--graceful [--maxthreads xx] [--noack] [--movetimeout]]"
|
||||
|
||||
function usage() {
|
||||
echo "${usage_str}"
|
||||
|
@ -54,6 +55,8 @@ RR_RS=1
|
|||
RR_MASTER=1
|
||||
RR_GRACEFUL=0
|
||||
RR_MAXTHREADS=1
|
||||
RR_NOACK=
|
||||
RR_MOVE_TIMEOUT=2147483647
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
|
@ -80,6 +83,15 @@ while [ $# -gt 0 ]; do
|
|||
RR_MAXTHREADS=$1
|
||||
shift
|
||||
;;
|
||||
--noack)
|
||||
RR_NOACK="--noack"
|
||||
shift
|
||||
;;
|
||||
--movetimeout)
|
||||
shift
|
||||
RR_MOVE_TIMEOUT=$1
|
||||
shift
|
||||
;;
|
||||
--help|-h)
|
||||
usage
|
||||
exit 0
|
||||
|
@ -186,7 +198,8 @@ else
|
|||
continue
|
||||
else
|
||||
echo "Gracefully restarting: $hostname"
|
||||
"$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug --maxthreads "${RR_MAXTHREADS}" "$hostname"
|
||||
"$bin"/graceful_stop.sh --config ${HBASE_CONF_DIR} --restart --reload --maxthreads \
|
||||
${RR_MAXTHREADS} ${RR_NOACK} --movetimeout ${RR_MOVE_TIMEOUT} $hostname
|
||||
sleep 1
|
||||
fi
|
||||
done
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# File passed to org.jruby.Main by bin/hbase. Pollutes jirb with hbase imports
|
||||
# and hbase commands and then loads jirb. Outputs a banner that tells user
|
||||
# where to find help, shell version, and loads up a custom hirb.
|
||||
|
||||
require 'thread'
|
||||
|
||||
class ThreadPool
|
||||
def initialize(poolsize)
|
||||
@queue = Queue.new
|
||||
@poolsize = poolsize
|
||||
@pool = Array.new(@poolsize) do |i|
|
||||
Thread.new do
|
||||
Thread.current[:id] = i
|
||||
catch(:close) do
|
||||
loop do
|
||||
job, args = @queue.pop
|
||||
job.call(*args)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def launch(*args, &block)
|
||||
@queue << [block, args]
|
||||
end
|
||||
|
||||
def stop
|
||||
@poolsize.times do
|
||||
launch { throw :close }
|
||||
end
|
||||
@pool.map(&:join)
|
||||
end
|
||||
end
|
|
@ -379,7 +379,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Could not get list of region servers", e);
|
||||
} catch (Exception e) {
|
||||
LOG.info("hostname=" + hostname + " is not up yet, waiting", e);
|
||||
LOG.info("hostname=" + hostname + " is not up yet, waiting");
|
||||
}
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
|
@ -392,7 +392,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
LOG.error("Host:" + hostname + " is not up.Giving up.");
|
||||
throw new Exception("Host to load regions not online");
|
||||
}
|
||||
LOG.info("Moving" + regionsToMove.size() + " regions to " + server + " using "
|
||||
LOG.info("Moving " + regionsToMove.size() + " regions to " + server + " using "
|
||||
+ this.maxthreads + " threads.Ack mode:" + this.ack);
|
||||
ExecutorService moveRegionsPool = Executors.newFixedThreadPool(this.maxthreads);
|
||||
List<Future<Boolean>> taskList = new ArrayList<Future<Boolean>>();
|
||||
|
@ -559,7 +559,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
boolean sameServer = true;
|
||||
// Assert we can scan the region in its current location
|
||||
isSuccessfulScan(admin, region);
|
||||
LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
|
||||
LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
+ targetServer);
|
||||
while (count < retries && sameServer) {
|
||||
if (count > 0) {
|
||||
|
@ -616,7 +616,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
@Override
|
||||
public Boolean call() {
|
||||
try {
|
||||
LOG.info("Moving region:" + region.getEncodedName() + "from " + sourceServer + " to "
|
||||
LOG.info("Moving region:" + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
+ targetServer);
|
||||
admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer));
|
||||
LOG.info("Moved " + region.getEncodedName() + " from " + sourceServer + " to "
|
||||
|
@ -733,16 +733,21 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
LOG.info("Excluded Servers are" + excludes.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Exclude master from list of RSs to move regions to
|
||||
* @param regionServers
|
||||
* @param admin
|
||||
* @throws Exception
|
||||
* @throws IOException
|
||||
*/
|
||||
private void stripMaster(ArrayList<String> regionServers, Admin admin) throws Exception {
|
||||
stripServer(regionServers, admin.getClusterStatus().getMaster().getHostname(),
|
||||
admin.getClusterStatus().getMaster().getPort());
|
||||
private void stripMaster(ArrayList<String> regionServers, Admin admin) throws IOException {
|
||||
String masterHostname = admin.getClusterStatus().getMaster().getHostname();
|
||||
int masterPort = admin.getClusterStatus().getMaster().getPort();
|
||||
try {
|
||||
stripServer(regionServers, masterHostname, masterPort);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Could not remove master from list of RS", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -931,7 +936,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
@Override
|
||||
protected void addOptions() {
|
||||
this.addRequiredOptWithArg("r", "regionserverhost", "region server <hostname>|<hostname:port>");
|
||||
this.addRequiredOptWithArg("l", "Expected: load/unload");
|
||||
this.addRequiredOptWithArg("o", "operation", "Expected: load/unload");
|
||||
this.addOptWithArg("m", "maxthreads",
|
||||
"Define the maximum number of threads to use to unload and reload the regions");
|
||||
this.addOptWithArg("x", "excludefile",
|
||||
|
@ -940,9 +945,11 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
this.addOptWithArg("f", "filename",
|
||||
"File to save regions list into unloading, or read from loading; "
|
||||
+ "default /tmp/<usernamehostname:port>");
|
||||
this.addOptNoArg("n", "noAck",
|
||||
"Enable Ack mode(default: true) which checks if region is online on target RegionServer -- "
|
||||
+ "Upon disabling,in case a region is stuck, it'll move on anyways");
|
||||
this.addOptNoArg("n", "noack",
|
||||
"Turn on No-Ack mode(default: false) which won't check if region is online on target "
|
||||
+ "RegionServer, hence best effort. This is more performant in unloading and loading "
|
||||
+ "but might lead to region being unavailable for some time till master reassigns it "
|
||||
+ "in case the move failed");
|
||||
this.addOptWithArg("t", "timeout", "timeout in seconds after which the tool will exit "
|
||||
+ "irrespective of whether it finished or not;default Integer.MAX_VALUE");
|
||||
}
|
||||
|
@ -966,21 +973,22 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
if (cmd.hasOption('t')) {
|
||||
rmbuilder.timeout(Integer.parseInt(cmd.getOptionValue('t')));
|
||||
}
|
||||
this.loadUnload = cmd.getOptionValue("l").toLowerCase();
|
||||
this.loadUnload = cmd.getOptionValue("o").toLowerCase();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int doWork() throws Exception {
|
||||
boolean success;
|
||||
RegionMover rm = rmbuilder.build();
|
||||
if (loadUnload.equalsIgnoreCase("load")) {
|
||||
rm.load();
|
||||
success = rm.load();
|
||||
} else if (loadUnload.equalsIgnoreCase("unload")) {
|
||||
rm.unload();
|
||||
success = rm.unload();
|
||||
} else {
|
||||
printUsage();
|
||||
System.exit(1);
|
||||
success = false;
|
||||
}
|
||||
return 0;
|
||||
return (success ? 0 : 1);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
|
Loading…
Reference in New Issue