HBASE-8803 region_mover.rb should move multiple regions at a time
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1572327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ee48ff48a1
commit
e09d637d7a
|
@ -24,13 +24,14 @@
|
||||||
# Turn off the balancer before running this script.
|
# Turn off the balancer before running this script.
|
||||||
function usage {
|
function usage {
|
||||||
echo "Usage: graceful_stop.sh [--config <conf-dir>] [-d] [-e] [--restart [--reload]] [--thrift] [--rest] <hostname>"
|
echo "Usage: graceful_stop.sh [--config <conf-dir>] [-d] [-e] [--restart [--reload]] [--thrift] [--rest] <hostname>"
|
||||||
echo " thrift If we should stop/start thrift before/after the hbase stop/start"
|
echo " thrift If we should stop/start thrift before/after the hbase stop/start"
|
||||||
echo " rest If we should stop/start rest before/after the hbase stop/start"
|
echo " rest If we should stop/start rest before/after the hbase stop/start"
|
||||||
echo " restart If we should restart after graceful stop"
|
echo " restart If we should restart after graceful stop"
|
||||||
echo " reload Move offloaded regions back on to the restarted server"
|
echo " reload Move offloaded regions back on to the restarted server"
|
||||||
echo " d|debug Print helpful debug information"
|
echo " d|debug Print helpful debug information"
|
||||||
echo " hostname Hostname of server we are to stop"
|
echo " maxthreads xx Limit the number of threads used by the region mover. Default value is 1."
|
||||||
echo " e|failfast Set -e so exit immediately if any command exits with non-zero status"
|
echo " hostname Hostname of server we are to stop"
|
||||||
|
echo " e|failfast Set -e so exit immediately if any command exits with non-zero status"
|
||||||
exit 1
|
exit 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ reload=
|
||||||
debug=
|
debug=
|
||||||
thrift=
|
thrift=
|
||||||
rest=
|
rest=
|
||||||
|
maxthreads=1
|
||||||
failfast=
|
failfast=
|
||||||
while [ $# -gt 0 ]
|
while [ $# -gt 0 ]
|
||||||
do
|
do
|
||||||
|
@ -58,6 +60,7 @@ do
|
||||||
--reload) reload=true; shift;;
|
--reload) reload=true; shift;;
|
||||||
--failfast | -e) failfast=true; shift;;
|
--failfast | -e) failfast=true; shift;;
|
||||||
--debug | -d) debug="--debug"; shift;;
|
--debug | -d) debug="--debug"; shift;;
|
||||||
|
--maxthreads) shift; maxthreads=$1; shift;;
|
||||||
--) shift; break;;
|
--) shift; break;;
|
||||||
-*) usage ;;
|
-*) usage ;;
|
||||||
*) break;; # terminate while loop
|
*) break;; # terminate while loop
|
||||||
|
@ -88,7 +91,7 @@ HBASE_BALANCER_STATE=`echo 'balance_switch false' | "$bin"/hbase --config ${HBAS
|
||||||
log "Previous balancer state was $HBASE_BALANCER_STATE"
|
log "Previous balancer state was $HBASE_BALANCER_STATE"
|
||||||
|
|
||||||
log "Unloading $hostname region(s)"
|
log "Unloading $hostname region(s)"
|
||||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug unload $hostname
|
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads unload $hostname
|
||||||
log "Unloaded $hostname region(s)"
|
log "Unloaded $hostname region(s)"
|
||||||
|
|
||||||
# Stop the server(s). Have to put hostname into its own little file for hbase-daemons.sh
|
# Stop the server(s). Have to put hostname into its own little file for hbase-daemons.sh
|
||||||
|
@ -119,7 +122,7 @@ if [ "$restart" != "" ]; then
|
||||||
fi
|
fi
|
||||||
if [ "$reload" != "" ]; then
|
if [ "$reload" != "" ]; then
|
||||||
log "Reloading $hostname region(s)"
|
log "Reloading $hostname region(s)"
|
||||||
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug load $hostname
|
HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads load $hostname
|
||||||
log "Reloaded $hostname region(s)"
|
log "Reloaded $hostname region(s)"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
# Does not work for case of multiple regionservers all running on the
|
# Does not work for case of multiple regionservers all running on the
|
||||||
# one node.
|
# one node.
|
||||||
require 'optparse'
|
require 'optparse'
|
||||||
|
require File.join(File.dirname(__FILE__), 'thread-pool')
|
||||||
include Java
|
include Java
|
||||||
import org.apache.hadoop.hbase.HConstants
|
import org.apache.hadoop.hbase.HConstants
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration
|
import org.apache.hadoop.hbase.HBaseConfiguration
|
||||||
|
@ -161,6 +162,7 @@ def move(admin, r, newServer, original)
|
||||||
# Now move it. Do it in a loop so can retry if fail. Have seen issue where
|
# Now move it. Do it in a loop so can retry if fail. Have seen issue where
|
||||||
# we tried move region but failed and retry put it back on old location;
|
# we tried move region but failed and retry put it back on old location;
|
||||||
# retry in this case.
|
# retry in this case.
|
||||||
|
|
||||||
retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5)
|
retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5)
|
||||||
count = 0
|
count = 0
|
||||||
same = true
|
same = true
|
||||||
|
@ -346,20 +348,26 @@ def unloadRegions(options, hostname)
|
||||||
break if rs.length == 0
|
break if rs.length == 0
|
||||||
count = 0
|
count = 0
|
||||||
$LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername +
|
$LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername +
|
||||||
" during this cycle");
|
" on " + servers.length.to_s + " servers using " + options[:maxthreads].to_s + " threads.")
|
||||||
for r in rs
|
counter = 0
|
||||||
# Get a random server to move the region to.
|
pool = ThreadPool.new(options[:maxthreads])
|
||||||
server = servers[rand(servers.length)]
|
server_index = 0
|
||||||
$LOG.info("Moving region " + r.getRegionNameAsString() + " (" +
|
while counter < rs.length do
|
||||||
count.to_s + " of " + rs.length.to_s + ") from server=" +
|
pool.launch(rs,counter,server_index) do |_rs,_counter,_server_index|
|
||||||
servername + " to server=" + server);
|
$LOG.info("Moving region " + _rs[_counter].getEncodedName() + " (" + _counter.to_s +
|
||||||
count = count + 1
|
" of " + _rs.length.to_s + ") to server=" + servers[_server_index] + " for " + servername)
|
||||||
# Assert we can scan region in its current location
|
# Assert we can scan region in its current location
|
||||||
isSuccessfulScan(admin, r)
|
isSuccessfulScan(admin, _rs[_counter])
|
||||||
# Now move it.
|
# Now move it.
|
||||||
move(admin, r, server, servername)
|
move(admin, _rs[_counter], servers[_server_index], servername)
|
||||||
movedRegions.add(r)
|
movedRegions.add(_rs[_counter])
|
||||||
|
end
|
||||||
|
counter += 1
|
||||||
|
server_index = (server_index + 1) % servers.length
|
||||||
end
|
end
|
||||||
|
$LOG.info("Waiting for the pool to complete")
|
||||||
|
pool.stop
|
||||||
|
$LOG.info("Pool completed")
|
||||||
end
|
end
|
||||||
if movedRegions.size() > 0
|
if movedRegions.size() > 0
|
||||||
# Write out file of regions moved
|
# Write out file of regions moved
|
||||||
|
@ -395,7 +403,10 @@ def loadRegions(options, hostname)
|
||||||
count = 0
|
count = 0
|
||||||
# sleep 20s to make sure the rs finished initialization.
|
# sleep 20s to make sure the rs finished initialization.
|
||||||
sleep 20
|
sleep 20
|
||||||
for r in regions
|
counter = 0
|
||||||
|
pool = ThreadPool.new(options[:maxthreads])
|
||||||
|
while counter < regions.length do
|
||||||
|
r = regions[counter]
|
||||||
exists = false
|
exists = false
|
||||||
begin
|
begin
|
||||||
isSuccessfulScan(admin, r)
|
isSuccessfulScan(admin, r)
|
||||||
|
@ -403,19 +414,22 @@ def loadRegions(options, hostname)
|
||||||
rescue org.apache.hadoop.hbase.NotServingRegionException => e
|
rescue org.apache.hadoop.hbase.NotServingRegionException => e
|
||||||
$LOG.info("Failed scan of " + e.message)
|
$LOG.info("Failed scan of " + e.message)
|
||||||
end
|
end
|
||||||
count = count + 1
|
|
||||||
next unless exists
|
next unless exists
|
||||||
currentServer = getServerNameForRegion(admin, r)
|
currentServer = getServerNameForRegion(admin, r)
|
||||||
if currentServer and currentServer == servername
|
if currentServer and currentServer == servername
|
||||||
$LOG.info("Region " + r.getRegionNameAsString() + " (" + count.to_s +
|
$LOG.info("Region " + r.getRegionNameAsString() + " (" + count.to_s +
|
||||||
" of " + regions.length.to_s + ") already on target server=" + servername)
|
" of " + regions.length.to_s + ") already on target server=" + servername)
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
$LOG.info("Moving region " + r.getRegionNameAsString() + " (" +
|
pool.launch(r,currentServer,count) do |_r,_currentServer,_count|
|
||||||
count.to_s + " of " + regions.length.to_s + ") from server=" +
|
$LOG.info("Moving region " + _r.getRegionNameAsString() + " (" + _count.to_s +
|
||||||
currentServer + " to server=" + servername);
|
" of " + regions.length.to_s + ") from " + _currentServer + " to server=" +
|
||||||
move(admin, r, servername, currentServer)
|
servername);
|
||||||
|
move(admin, _r, servername, _currentServer)
|
||||||
|
end
|
||||||
|
counter = counter + 1
|
||||||
end
|
end
|
||||||
|
pool.stop
|
||||||
end
|
end
|
||||||
|
|
||||||
# Returns an array of hosts to exclude as region move targets
|
# Returns an array of hosts to exclude as region move targets
|
||||||
|
@ -456,6 +470,7 @@ optparse = OptionParser.new do |opts|
|
||||||
opts.banner = "Usage: #{NAME}.rb [options] load|unload <hostname>"
|
opts.banner = "Usage: #{NAME}.rb [options] load|unload <hostname>"
|
||||||
opts.separator 'Load or unload regions by moving one at a time'
|
opts.separator 'Load or unload regions by moving one at a time'
|
||||||
options[:file] = nil
|
options[:file] = nil
|
||||||
|
options[:maxthreads] = 1
|
||||||
opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /tmp/<hostname>') do |file|
|
opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /tmp/<hostname>') do |file|
|
||||||
options[:file] = file
|
options[:file] = file
|
||||||
end
|
end
|
||||||
|
@ -470,6 +485,9 @@ optparse = OptionParser.new do |opts|
|
||||||
opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file|
|
opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file|
|
||||||
options[:excludesFile] = file
|
options[:excludesFile] = file
|
||||||
end
|
end
|
||||||
|
opts.on('-m', '--maxthreads=XX', 'Define the maximum number of threads to use to unload and reload the regions') do |number|
|
||||||
|
options[:maxthreads] = number.to_i
|
||||||
|
end
|
||||||
end
|
end
|
||||||
optparse.parse!
|
optparse.parse!
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
#
|
#
|
||||||
# Modelled after $HADOOP_HOME/bin/slaves.sh.
|
# Modelled after $HADOOP_HOME/bin/slaves.sh.
|
||||||
|
|
||||||
usage="Usage: $0 [--config <hbase-confdir>] [--rs-only] [--master-only] [--graceful]"
|
usage="Usage: $0 [--config <hbase-confdir>] [--rs-only] [--master-only] [--graceful] [--maxthreads xx]"
|
||||||
|
|
||||||
bin=`dirname "$0"`
|
bin=`dirname "$0"`
|
||||||
bin=`cd "$bin">/dev/null; pwd`
|
bin=`cd "$bin">/dev/null; pwd`
|
||||||
|
@ -57,23 +57,32 @@ function usage() {
|
||||||
RR_RS=1
|
RR_RS=1
|
||||||
RR_MASTER=1
|
RR_MASTER=1
|
||||||
RR_GRACEFUL=0
|
RR_GRACEFUL=0
|
||||||
|
RR_MAXTHREADS=1
|
||||||
|
|
||||||
for x in "$@" ; do
|
while [ $# -gt 0 ]; do
|
||||||
case "$x" in
|
case "$1" in
|
||||||
--rs-only|-r)
|
--rs-only|-r)
|
||||||
RR_RS=1
|
RR_RS=1
|
||||||
RR_MASTER=0
|
RR_MASTER=0
|
||||||
RR_GRACEFUL=0
|
RR_GRACEFUL=0
|
||||||
|
shift
|
||||||
;;
|
;;
|
||||||
--master-only)
|
--master-only)
|
||||||
RR_RS=0
|
RR_RS=0
|
||||||
RR_MASTER=1
|
RR_MASTER=1
|
||||||
RR_GRACEFUL=0
|
RR_GRACEFUL=0
|
||||||
|
shift
|
||||||
;;
|
;;
|
||||||
--graceful)
|
--graceful)
|
||||||
RR_RS=0
|
RR_RS=0
|
||||||
RR_MASTER=0
|
RR_MASTER=0
|
||||||
RR_GRACEFUL=1
|
RR_GRACEFUL=1
|
||||||
|
shift
|
||||||
|
;;
|
||||||
|
--maxthreads)
|
||||||
|
shift
|
||||||
|
RR_MAXTHREADS=$1
|
||||||
|
shift
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
echo Bad argument: $x
|
echo Bad argument: $x
|
||||||
|
@ -85,7 +94,7 @@ done
|
||||||
|
|
||||||
# quick function to get a value from the HBase config file
|
# quick function to get a value from the HBase config file
|
||||||
# HBASE-6504 - only take the first line of the output in case verbose gc is on
|
# HBASE-6504 - only take the first line of the output in case verbose gc is on
|
||||||
distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed | head -n 1`
|
distMode=`HBASE_CONF_DIR=${HBASE_CONF_DIR} $bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed | head -n 1`
|
||||||
if [ "$distMode" == 'false' ]; then
|
if [ "$distMode" == 'false' ]; then
|
||||||
if [ $RR_RS -ne 1 ] || [ $RR_MASTER -ne 1 ]; then
|
if [ $RR_RS -ne 1 ] || [ $RR_MASTER -ne 1 ]; then
|
||||||
echo Cant do selective rolling restart if not running distributed
|
echo Cant do selective rolling restart if not running distributed
|
||||||
|
@ -158,7 +167,7 @@ else
|
||||||
rs_parts=(${rs//,/ })
|
rs_parts=(${rs//,/ })
|
||||||
hostname=${rs_parts[0]}
|
hostname=${rs_parts[0]}
|
||||||
echo "Gracefully restarting: $hostname"
|
echo "Gracefully restarting: $hostname"
|
||||||
"$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug "$hostname"
|
"$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug --maxthreads "${RR_MAXTHREADS}" "$hostname"
|
||||||
sleep 1
|
sleep 1
|
||||||
done
|
done
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
# File passed to org.jruby.Main by bin/hbase. Pollutes jirb with hbase imports
|
||||||
|
# and hbase commands and then loads jirb. Outputs a banner that tells user
|
||||||
|
# where to find help, shell version, and loads up a custom hirb.
|
||||||
|
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
|
class ThreadPool
|
||||||
|
def initialize(poolsize)
|
||||||
|
@queue = Queue.new
|
||||||
|
@poolsize = poolsize
|
||||||
|
@pool = Array.new(@poolsize) do |i|
|
||||||
|
Thread.new do
|
||||||
|
Thread.current[:id] = i
|
||||||
|
catch(:close) do
|
||||||
|
loop do
|
||||||
|
job, args = @queue.pop
|
||||||
|
job.call(*args)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def launch(*args, &block)
|
||||||
|
@queue << [block, args]
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
@poolsize.times do
|
||||||
|
launch { throw :close }
|
||||||
|
end
|
||||||
|
@pool.map(&:join)
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue