diff --git a/bin/graceful_stop.sh b/bin/graceful_stop.sh index 37b5685085d..ed9af23130c 100755 --- a/bin/graceful_stop.sh +++ b/bin/graceful_stop.sh @@ -24,13 +24,14 @@ # Turn off the balancer before running this script. function usage { echo "Usage: graceful_stop.sh [--config ] [-d] [-e] [--restart [--reload]] [--thrift] [--rest] " - echo " thrift If we should stop/start thrift before/after the hbase stop/start" - echo " rest If we should stop/start rest before/after the hbase stop/start" - echo " restart If we should restart after graceful stop" - echo " reload Move offloaded regions back on to the restarted server" - echo " d|debug Print helpful debug information" - echo " hostname Hostname of server we are to stop" - echo " e|failfast Set -e so exit immediately if any command exits with non-zero status" + echo " thrift If we should stop/start thrift before/after the hbase stop/start" + echo " rest If we should stop/start rest before/after the hbase stop/start" + echo " restart If we should restart after graceful stop" + echo " reload Move offloaded regions back on to the restarted server" + echo " d|debug Print helpful debug information" + echo " maxthreads xx Limit the number of threads used by the region mover. Default value is 1." + echo " hostname Hostname of server we are to stop" + echo " e|failfast Set -e so exit immediately if any command exits with non-zero status" exit 1 } @@ -48,6 +49,7 @@ reload= debug= thrift= rest= +maxthreads=1 failfast= while [ $# -gt 0 ] do @@ -58,6 +60,7 @@ do --reload) reload=true; shift;; --failfast | -e) failfast=true; shift;; --debug | -d) debug="--debug"; shift;; + --maxthreads) shift; maxthreads=$1; shift;; --) shift; break;; -*) usage ;; *) break;; # terminate while loop @@ -88,7 +91,7 @@ HBASE_BALANCER_STATE=`echo 'balance_switch false' | "$bin"/hbase --config ${HBAS log "Previous balancer state was $HBASE_BALANCER_STATE" log "Unloading $hostname region(s)" -HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug unload $hostname +HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads unload $hostname log "Unloaded $hostname region(s)" # Stop the server(s). Have to put hostname into its own little file for hbase-daemons.sh @@ -119,7 +122,7 @@ if [ "$restart" != "" ]; then fi if [ "$reload" != "" ]; then log "Reloading $hostname region(s)" - HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug load $hostname + HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads load $hostname log "Reloaded $hostname region(s)" fi fi diff --git a/bin/region_mover.rb b/bin/region_mover.rb index 222d9a4b686..b4f17bcf368 100644 --- a/bin/region_mover.rb +++ b/bin/region_mover.rb @@ -23,6 +23,7 @@ # Does not work for case of multiple regionservers all running on the # one node. require 'optparse' +require File.join(File.dirname(__FILE__), 'thread-pool') include Java import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.HBaseConfiguration @@ -161,6 +162,7 @@ def move(admin, r, newServer, original) # Now move it. Do it in a loop so can retry if fail. Have seen issue where # we tried move region but failed and retry put it back on old location; # retry in this case. + retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5) count = 0 same = true @@ -346,20 +348,26 @@ def unloadRegions(options, hostname) break if rs.length == 0 count = 0 $LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername + - " during this cycle"); - for r in rs - # Get a random server to move the region to. - server = servers[rand(servers.length)] - $LOG.info("Moving region " + r.getRegionNameAsString() + " (" + - count.to_s + " of " + rs.length.to_s + ") from server=" + - servername + " to server=" + server); - count = count + 1 - # Assert we can scan region in its current location - isSuccessfulScan(admin, r) - # Now move it. - move(admin, r, server, servername) - movedRegions.add(r) + " on " + servers.length.to_s + " servers using " + options[:maxthreads].to_s + " threads.") + counter = 0 + pool = ThreadPool.new(options[:maxthreads]) + server_index = 0 + while counter < rs.length do + pool.launch(rs,counter,server_index) do |_rs,_counter,_server_index| + $LOG.info("Moving region " + _rs[_counter].getEncodedName() + " (" + _counter.to_s + + " of " + _rs.length.to_s + ") to server=" + servers[_server_index] + " for " + servername) + # Assert we can scan region in its current location + isSuccessfulScan(admin, _rs[_counter]) + # Now move it. + move(admin, _rs[_counter], servers[_server_index], servername) + movedRegions.add(_rs[_counter]) + end + counter += 1 + server_index = (server_index + 1) % servers.length end + $LOG.info("Waiting for the pool to complete") + pool.stop + $LOG.info("Pool completed") end if movedRegions.size() > 0 # Write out file of regions moved @@ -395,7 +403,10 @@ def loadRegions(options, hostname) count = 0 # sleep 20s to make sure the rs finished initialization. sleep 20 - for r in regions + counter = 0 + pool = ThreadPool.new(options[:maxthreads]) + while counter < regions.length do + r = regions[counter] exists = false begin isSuccessfulScan(admin, r) @@ -403,19 +414,22 @@ def loadRegions(options, hostname) rescue org.apache.hadoop.hbase.NotServingRegionException => e $LOG.info("Failed scan of " + e.message) end - count = count + 1 next unless exists currentServer = getServerNameForRegion(admin, r) if currentServer and currentServer == servername $LOG.info("Region " + r.getRegionNameAsString() + " (" + count.to_s + - " of " + regions.length.to_s + ") already on target server=" + servername) + " of " + regions.length.to_s + ") already on target server=" + servername) next end - $LOG.info("Moving region " + r.getRegionNameAsString() + " (" + - count.to_s + " of " + regions.length.to_s + ") from server=" + - currentServer + " to server=" + servername); - move(admin, r, servername, currentServer) + pool.launch(r,currentServer,count) do |_r,_currentServer,_count| + $LOG.info("Moving region " + _r.getRegionNameAsString() + " (" + _count.to_s + + " of " + regions.length.to_s + ") from " + _currentServer + " to server=" + + servername); + move(admin, _r, servername, _currentServer) + end + counter = counter + 1 end + pool.stop end # Returns an array of hosts to exclude as region move targets @@ -456,6 +470,7 @@ optparse = OptionParser.new do |opts| opts.banner = "Usage: #{NAME}.rb [options] load|unload " opts.separator 'Load or unload regions by moving one at a time' options[:file] = nil + options[:maxthreads] = 1 opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /tmp/') do |file| options[:file] = file end @@ -470,6 +485,9 @@ optparse = OptionParser.new do |opts| opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file| options[:excludesFile] = file end + opts.on('-m', '--maxthreads=XX', 'Define the maximum number of threads to use to unload and reload the regions') do |number| + options[:maxthreads] = number.to_i + end end optparse.parse! diff --git a/bin/rolling-restart.sh b/bin/rolling-restart.sh index 97843e8ed4b..1f17d7ed352 100755 --- a/bin/rolling-restart.sh +++ b/bin/rolling-restart.sh @@ -34,7 +34,7 @@ # # Modelled after $HADOOP_HOME/bin/slaves.sh. -usage="Usage: $0 [--config ] [--rs-only] [--master-only] [--graceful]" +usage="Usage: $0 [--config ] [--rs-only] [--master-only] [--graceful] [--maxthreads xx]" bin=`dirname "$0"` bin=`cd "$bin">/dev/null; pwd` @@ -57,23 +57,32 @@ function usage() { RR_RS=1 RR_MASTER=1 RR_GRACEFUL=0 +RR_MAXTHREADS=1 -for x in "$@" ; do - case "$x" in +while [ $# -gt 0 ]; do + case "$1" in --rs-only|-r) RR_RS=1 RR_MASTER=0 RR_GRACEFUL=0 + shift ;; --master-only) RR_RS=0 RR_MASTER=1 RR_GRACEFUL=0 + shift ;; --graceful) RR_RS=0 RR_MASTER=0 RR_GRACEFUL=1 + shift + ;; + --maxthreads) + shift + RR_MAXTHREADS=$1 + shift ;; *) echo Bad argument: $x @@ -85,7 +94,7 @@ done # quick function to get a value from the HBase config file # HBASE-6504 - only take the first line of the output in case verbose gc is on -distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed | head -n 1` +distMode=`HBASE_CONF_DIR=${HBASE_CONF_DIR} $bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed | head -n 1` if [ "$distMode" == 'false' ]; then if [ $RR_RS -ne 1 ] || [ $RR_MASTER -ne 1 ]; then echo Cant do selective rolling restart if not running distributed @@ -158,7 +167,7 @@ else rs_parts=(${rs//,/ }) hostname=${rs_parts[0]} echo "Gracefully restarting: $hostname" - "$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug "$hostname" + "$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug --maxthreads "${RR_MAXTHREADS}" "$hostname" sleep 1 done fi diff --git a/bin/thread-pool.rb b/bin/thread-pool.rb new file mode 100644 index 00000000000..68140124c8c --- /dev/null +++ b/bin/thread-pool.rb @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# File passed to org.jruby.Main by bin/hbase. Pollutes jirb with hbase imports +# and hbase commands and then loads jirb. Outputs a banner that tells user +# where to find help, shell version, and loads up a custom hirb. + +require 'thread' + +class ThreadPool + def initialize(poolsize) + @queue = Queue.new + @poolsize = poolsize + @pool = Array.new(@poolsize) do |i| + Thread.new do + Thread.current[:id] = i + catch(:close) do + loop do + job, args = @queue.pop + job.call(*args) + end + end + end + end + end + + def launch(*args, &block) + @queue << [block, args] + end + + def stop + @poolsize.times do + launch { throw :close } + end + @pool.map(&:join) + end +end