# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Moves regions. Will confirm region access in current location and will # not move a new region until successful confirm of region loading in new # location. Presumes balancer is disabled when we run (not harmful if its # on but this script and balancer will end up fighting each other). require 'optparse' require File.join(File.dirname(__FILE__), 'thread-pool') include Java import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.client.Get import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HConnectionManager import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.InclusiveStopFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.util.Writables import org.apache.hadoop.conf.Configuration import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.ServerName import org.apache.hadoop.hbase.HRegionInfo import org.apache.hadoop.hbase.rsgroup.RSGroupAdmin import org.apache.hadoop.hbase.rsgroup.RSGroupAdminClient import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.net.Address # Name of this script NAME = "region_mover" # Returns true if passed region is still on 'original' when we look at hbase:meta. def isSameServer(admin, r, original) server = getServerNameForRegion(admin, r) return false unless server and original return server == original end class RubyAbortable include org.apache.hadoop.hbase.Abortable def abort(why, e) puts "ABORTED! why=" + why + ", e=" + e.to_s end end # Get servername that is up in hbase:meta; this is hostname + port + startcode comma-delimited. # Can return nil def getServerNameForRegion(admin, r) return nil unless admin.isTableEnabled(r.getTableName) if r.isMetaRegion() # Hack zkw = org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.new(admin.getConfiguration(), "region_mover", nil) mtl = org.apache.hadoop.hbase.zookeeper.MetaTableLocator.new() begin while not mtl.isLocationAvailable(zkw) sleep 0.1 end metaServer = (mtl.getMetaRegionLocation(zkw).toString()).to_s return metaServer ensure zkw.close() end end table = HTable.new(admin.getConfiguration(), HConstants::META_TABLE_NAME) begin g = Get.new(r.getRegionName()) g.addColumn(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER) g.addColumn(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER) result = table.get(g) return nil unless result server = result.getValue(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER) startcode = result.getValue(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER) return nil unless server return ServerName.getServerName(Bytes.toString(server), Bytes.toLong(startcode)) ensure table.close() end end # Trys to scan a row from passed region # Throws exception if can't def isSuccessfulScan(admin, r) scan = Scan.new(r.getStartKey(), r.getStartKey()) scan.setBatch(1) scan.setCaching(1) scan.setFilter(FilterList.new(FirstKeyOnlyFilter.new(),InclusiveStopFilter.new(r.getStartKey()))) begin table = HTable.new(admin.getConfiguration(), r.getTableName()) scanner = table.getScanner(scan) begin results = scanner.next() # We might scan into next region, this might be an empty table. # But if no exception, presume scanning is working. ensure scanner.close() end rescue org.apache.hadoop.hbase.TableNotFoundException, org.apache.hadoop.hbase.TableNotEnabledException => e $LOG.warn("Region " + r.getEncodedName() + " belongs to recently " + "deleted/disabled table. Skipping... " + e.message) return ensure table.close() unless table.nil? end end # Check region has moved successful and is indeed hosted on another server # Wait until that is the case. def move(admin, r, newServer, original) # Now move it. Do it in a loop so can retry if fail. Have seen issue where # we tried move region but failed and retry put it back on old location; # retry in this case. retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5) count = 0 same = true start = Time.now while count < retries and same if count > 0 $LOG.info("Retry " + count.to_s + " of maximum " + retries.to_s) end count = count + 1 begin admin.move(Bytes.toBytes(r.getEncodedName()), Bytes.toBytes(newServer)) rescue java.lang.reflect.UndeclaredThrowableException, org.apache.hadoop.hbase.UnknownRegionException => e $LOG.info("Exception moving " + r.getEncodedName() + "; split/moved? Continuing: " + e) return end # Wait till its up on new server before moving on maxWaitInSeconds = admin.getConfiguration.getInt("hbase.move.wait.max", 60) maxWait = Time.now + maxWaitInSeconds while Time.now < maxWait same = isSameServer(admin, r, original) break unless same sleep 0.1 end end raise RuntimeError, "Region " + r.getRegionNameAsString() + " stuck on #{original}, newserver=#{newServer}" if same # Assert can Scan from new location. isSuccessfulScan(admin, r) $LOG.info("Moved region " + r.getRegionNameAsString() + " cost: " + java.lang.String.format("%.3f", (Time.now - start))) end # Return the hostname:port out of a servername (all up to second ',') def getHostPortFromServerName(serverName) return serverName.split(',')[0..1] end # Return array of servernames where servername is hostname+port+startcode # comma-delimited def getServers(admin) serverInfos = admin.getClusterStatus().getServerInfo() servers = [] for server in serverInfos servers << server.getServerName() end return servers end # Remove the servername whose hostname portion matches from the passed # array of servers. Returns as side-effect the servername removed. def stripServer(servers, hostname, port) upperCaseHostname = hostname.upcase; count = servers.length servername = nil for server in servers hostFromServerName, portFromServerName = getHostPortFromServerName(server) hostFromServerName = hostFromServerName.upcase if hostFromServerName == upperCaseHostname and portFromServerName == port servername = servers.delete(server) end end # Check server to exclude is actually present raise RuntimeError, "Server %s:%d not online" % [hostname, port] unless servers.length < count return servername end # Returns a new serverlist that excludes the servername whose hostname portion # matches from the passed array of servers. def stripExcludes(servers, excludefile) excludes = readExcludes(excludefile) servers = servers.find_all{|server| !excludes.contains(getHostPortFromServerName(server).join(":")) } # return updated servers list return servers end # Return servername that matches passed hostname and port def getServerName(servers, hostname, port) servername = nil upperCaseHostname = hostname.upcase; for server in servers hostFromServerName, portFromServerName = getHostPortFromServerName(server) hostFromServerName = hostFromServerName.upcase if hostFromServerName == upperCaseHostname and portFromServerName == port servername = server break end end raise ArgumentError, "Server %s:%d not online" % [hostname, port] unless servername return servername end # Create a logger and disable the DEBUG-level annoying client logging def configureLogging(options) apacheLogger = LogFactory.getLog(NAME) # Configure log4j to not spew so much unless (options[:debug]) logger = org.apache.log4j.Logger.getLogger("org.apache.hadoop.hbase.client") logger.setLevel(org.apache.log4j.Level::INFO) end return apacheLogger end # Get configuration instance def getConfiguration() config = HBaseConfiguration.create() # No prefetching on hbase:meta This is for versions pre 0.99. Newer versions do not prefetch. config.setInt("hbase.client.prefetch.limit", 1) # Make a config that retries at short intervals many times config.setInt("hbase.client.pause", 500) config.setInt("hbase.client.retries.number", 100) return config end # Now get list of regions on targetServer def getRegions(config, servername) connection = HConnectionManager::getConnection(config); return ProtobufUtil::getOnlineRegions(connection.getAdmin(ServerName.valueOf(servername))); end def deleteFile(filename) f = java.io.File.new(filename) f.delete() if f.exists() end # Write HRegionInfo to file # Need to serialize in case non-printable characters. # Format is count of regionnames followed by serialized regionnames. def writeFile(filename, regions) fos = java.io.FileOutputStream.new(filename) dos = java.io.DataOutputStream.new(fos) # Write out a count of region names dos.writeInt(regions.size()) # Write actual region names. for r in regions Bytes.writeByteArray(dos, r.toByteArray()) end dos.close() end # See writeFile above. # Returns array of HRegionInfos def readFile(filename) f = java.io.File.new(filename) return java.util.ArrayList.new() unless f.exists() fis = java.io.FileInputStream.new(f) dis = java.io.DataInputStream.new(fis) # Read count of regions count = dis.readInt() regions = java.util.ArrayList.new(count) index = 0 while index < count regions.add(HRegionInfo.parseFromOrNull(Bytes.readByteArray(dis))) index = index + 1 end dis.close() return regions end # Move regions off the passed hostname:port def unloadRegions(options, hostname, port) # Clean up any old files. filename = getFilename(options, hostname, port) deleteFile(filename) # Get configuration config = getConfiguration() # Get an admin instance admin = HBaseAdmin.new(config) servers = getServers(admin) # If rsgroup enable, get servers belongs to the same rsgroup as given server if isEnableRSGroup(admin) $LOG.info("RegionServer group is enabled.") begin conn = ConnectionFactory.createConnection(config) rsgroupAdmin = RSGroupAdminClient.new(conn) servers = getSameRSGroupServers(servers, rsgroupAdmin, hostname, port) ensure conn.close() end end # Remove the server we are unloading from from list of servers. # Side-effect is the servername that matches this hostname servername = stripServer(servers, hostname, port) # Remove the servers in our exclude list from list of servers. servers = stripExcludes(servers, options[:excludesFile]) puts "Valid region move targets: ", servers if servers.length == 0 puts "No regions were moved - there was no server available" exit 4 end movedRegions = java.util.Collections.synchronizedList(java.util.ArrayList.new()) while true rs = getRegions(config, servername) # Remove those already tried to move rs.removeAll(movedRegions) break if rs.length == 0 $LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername + " on " + servers.length.to_s + " servers using " + options[:maxthreads].to_s + " threads.") counter = 0 pool = ThreadPool.new(options[:maxthreads]) server_index = 0 while counter < rs.length do pool.launch(rs,counter,server_index) do |_rs,_counter,_server_index| $LOG.info("Moving region " + _rs[_counter].getEncodedName() + " (" + (_counter + 1).to_s + " of " + _rs.length.to_s + ") to server=" + servers[_server_index] + " for " + servername) # Assert we can scan region in its current location isSuccessfulScan(admin, _rs[_counter]) # Now move it. move(admin, _rs[_counter], servers[_server_index], servername) movedRegions.add(_rs[_counter]) end counter += 1 server_index = (server_index + 1) % servers.length end $LOG.info("Waiting for the pool to complete") pool.stop $LOG.info("Pool completed") end if movedRegions.size() > 0 # Write out file of regions moved writeFile(filename, movedRegions) $LOG.info("Wrote list of moved regions to " + filename) end end # Move regions to the passed hostname def loadRegions(options, hostname, port) # Get configuration config = getConfiguration() # Get an admin instance admin = HBaseAdmin.new(config) filename = getFilename(options, hostname, port) regions = readFile(filename) return if regions.isEmpty() servername = nil # Wait till server is up maxWaitInSeconds = admin.getConfiguration.getInt("hbase.serverstart.wait.max", 180) maxWait = Time.now + maxWaitInSeconds while Time.now < maxWait servers = getServers(admin) begin servername = getServerName(servers, hostname, port) rescue ArgumentError => e $LOG.info("hostname=" + hostname.to_s + ":" + port.to_s + " is not up yet, waiting"); end break if servername sleep 0.5 end $LOG.info("Moving " + regions.size().to_s + " regions to " + servername) # sleep 20s to make sure the rs finished initialization. sleep 20 counter = 0 pool = ThreadPool.new(options[:maxthreads]) while counter < regions.length do r = regions[counter] exists = false begin isSuccessfulScan(admin, r) exists = true rescue org.apache.hadoop.hbase.NotServingRegionException => e $LOG.info("Failed scan of " + e.message) end next unless exists currentServer = getServerNameForRegion(admin, r) if currentServer and servername and currentServer == servername.to_s $LOG.info("Region " + r.getRegionNameAsString() + " (" + counter.to_s + " of " + regions.length.to_s + ") already on target server=" + servername) counter = counter + 1 next end pool.launch(r,currentServer,counter) do |_r,_currentServer,_counter| $LOG.info("Moving region " + _r.getRegionNameAsString() + " (" + (_counter + 1).to_s + " of " + regions.length.to_s + ") from " + _currentServer.to_s + " to server=" + servername); move(admin, _r, servername, _currentServer) end counter = counter + 1 end pool.stop end # Returns an array of hosts to exclude as region move targets def readExcludes(filename) if filename == nil return java.util.ArrayList.new() end if ! File.exist?(filename) puts "Error: Unable to read host exclude file: ", filename raise RuntimeError end f = File.new(filename, "r") # Read excluded hosts list excludes = java.util.ArrayList.new() while (line = f.gets) line.strip! # do an inplace drop of pre and post whitespaces excludes.add(line) unless line.empty? # exclude empty lines end puts "Excluding hosts as region move targets: ", excludes f.close return excludes end def getFilename(options, targetServer, port) filename = options[:file] if not filename filename = File.join(Dir.tmpdir(), ENV['USER'] + targetServer + "_" + port.to_s) end return filename end # Get servers in the same regionserver group as the given server def getSameRSGroupServers(servers, rsgroupAdmin, hostname, port) results = [] rsgroup = rsgroupAdmin.getRSGroupOfServer(Address.fromParts(hostname, java.lang.Integer.parseInt(port))) # rsgroup must be default or others, can't be nil $LOG.info("Getting servers list from group: " + rsgroup.getName()) rsservers = rsgroup.getServers() servers.each do |server| servername = ServerName.parseServerName(server) tmp = Address.fromParts(servername.getHostname(), servername.getPort()) if rsservers.contains(tmp) results << servername.getServerName() end end return results end # Determine whether rsgroup has been enabled def isEnableRSGroup(admin) coprocessors = java.util.Arrays.asList(admin.getMasterCoprocessors()); return coprocessors.contains("RSGroupAdminEndpoint") end # Do command-line parsing options = {} optparse = OptionParser.new do |opts| opts.banner = "Usage: #{NAME}.rb [options] load|unload [|]" opts.separator 'Load or unload regions by moving one at a time' options[:file] = nil options[:maxthreads] = 1 opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /USER_TMP_DIR/') do |file| options[:file] = file end opts.on('-h', '--help', 'Display usage information') do puts opts exit end options[:debug] = false opts.on('-d', '--debug', 'Display extra debug logging') do options[:debug] = true end opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file| options[:excludesFile] = file end opts.on('-m', '--maxthreads=XX', 'Define the maximum number of threads to use to unload and reload the regions') do |number| options[:maxthreads] = number.to_i end end optparse.parse! # Check ARGVs if ARGV.length < 2 puts optparse exit 1 end hostname, port = ARGV[1].split(":") if not hostname opts optparse exit 2 end # Get configuration config = getConfiguration() if not port port = config.getInt(HConstants::REGIONSERVER_PORT, HConstants::DEFAULT_REGIONSERVER_PORT) end port = port.to_s # Create a logger and save it to ruby global $LOG = configureLogging(options) case ARGV[0] when 'load' loadRegions(options, hostname, port) when 'unload' unloadRegions(options, hostname, port) else puts optparse exit 3 end