HBASE-4368 Expose processlist in shell (per regionserver and perhaps by cluster) (Talat Uyarer)

This commit is contained in:
stack 2016-05-23 15:47:35 -07:00
parent 51dfe44174
commit 627b48b799
10 changed files with 372 additions and 1 deletions

View File

@ -97,6 +97,7 @@ end
# Include classes definition # Include classes definition
require 'hbase/hbase' require 'hbase/hbase'
require 'hbase/admin' require 'hbase/admin'
require 'hbase/taskmonitor'
require 'hbase/table' require 'hbase/table'
require 'hbase/quotas' require 'hbase/quotas'
require 'hbase/replication_admin' require 'hbase/replication_admin'

View File

@ -952,6 +952,47 @@ module Hbase
@admin.listTableSnapshots(tableNameRegex, snapshotNameRegex).to_a @admin.listTableSnapshots(tableNameRegex, snapshotNameRegex).to_a
end end
#----------------------------------------------------------------------------------------------
# Returns a list of regionservers
def getRegionServers()
return @admin.getClusterStatus.getServers.map { |serverName| serverName }
end
#----------------------------------------------------------------------------------------------
# Returns a list of servernames
def getServerNames(servers)
regionservers = getRegionServers()
servernames = []
if servers.length == 0
# if no servers were specified as arguments, get a list of all servers
servernames = regionservers
else
# Strings replace with ServerName objects in servers array
i = 0
while (i < servers.length)
server = servers[i]
if ServerName.isFullServerName(server)
servernames.push(ServerName.valueOf(server))
else
name_list = server.split(",")
j = 0
while (j < regionservers.length)
sn = regionservers[j]
if name_list[0] == sn.hostname and (name_list[1] == nil ? true : (name_list[1] == sn.port.to_s) )
servernames.push(sn)
end
j += 1
end
end
i += 1
end
end
return servernames
end
# Apply config specific to a table/column to its descriptor # Apply config specific to a table/column to its descriptor
def set_descriptor_config(descriptor, config) def set_descriptor_config(descriptor, config)
raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.kind_of?(Hash) raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.kind_of?(Hash)

View File

@ -21,6 +21,7 @@ include Java
require 'hbase/admin' require 'hbase/admin'
require 'hbase/table' require 'hbase/table'
require 'hbase/taskmonitor'
require 'hbase/quotas' require 'hbase/quotas'
require 'hbase/security' require 'hbase/security'
require 'hbase/visibility_labels' require 'hbase/visibility_labels'
@ -47,6 +48,10 @@ module Hbase
::Hbase::Admin.new(@connection.getAdmin, formatter) ::Hbase::Admin.new(@connection.getAdmin, formatter)
end end
def taskmonitor(formatter)
::Hbase::TaskMonitor.new(configuration, formatter)
end
# Create new one each time # Create new one each time
def table(table, shell) def table(table, shell)
::Hbase::Table.new(@connection.getTable(table), shell) ::Hbase::Table.new(@connection.getTable(table), shell)

View File

@ -0,0 +1,205 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
include Java
# Add the $HBASE_HOME/lib directory to the ruby load_path to load jackson
if File.exists?(File.join(File.dirname(__FILE__), "..", "lib"))
$LOAD_PATH.unshift File.join(File.dirname(__FILE__), "..", "lib")
end
module Hbase
class TaskMonitor
include HBaseConstants
#---------------------------------------------------------------------------------------------
# Represents information reported by a server on a single MonitoredTask
class Task
def initialize(taskMap,host)
taskMap.each_pair do |k,v|
case k
when "statustimems"
@statustime = Time.at(v/1000)
when "status"
@status = v
when "starttimems"
@starttime = Time.at(v/1000)
when "description"
@description = v
when "state"
@state = v
end
end
@host = host
end
def statustime
# waiting IPC handlers often have statustime = -1, in this case return starttime
if @statustime > Time.at(-1)
return @statustime
end
return @starttime
end
attr_reader :host
attr_reader :status
attr_reader :starttime
attr_reader :description
attr_reader :state
end
def initialize(configuration, formatter)
@conf = configuration
@formatter = formatter
@conn = org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(@conf)
@admin = @conn.getAdmin()
end
#---------------------------------------------------------------------------------------------------
# Returns a filtered list of tasks on the given host
def tasksOnHost(filter,host)
java_import 'java.net.URL'
java_import 'org.codehaus.jackson.map.ObjectMapper'
infoport = @admin.getClusterStatus().getLoad(host).getInfoServerPort().to_s
# Note: This condition use constants from hbase-server
#if (!@admin.getConfiguration().getBoolean(org.apache.hadoop.hbase.http.ServerConfigurationKeys::HBASE_SSL_ENABLED_KEY,
# org.apache.hadoop.hbase.http.ServerConfigurationKeys::HBASE_SSL_ENABLED_DEFAULT))
# schema = "http://"
#else
# schema = "https://"
#end
schema = "http://"
url = schema + host.hostname + ":" + infoport + "/rs-status?format=json&filter=" + filter
json = URL.new(url)
mapper = ObjectMapper.new
# read and parse JSON
tasksArrayList = mapper.readValue(json,java.lang.Object.java_class)
# convert to an array of TaskMonitor::Task instances
tasks = Array.new
tasksArrayList.each do |t|
tasks.unshift Task.new(t,host)
end
return tasks
end
#---------------------------------------------------------------------------------------------------
# Prints a table of filtered tasks on requested hosts
def tasks(filter,hosts)
# put all tasks on all requested hosts in the same list
tasks = []
hosts.each do |host|
tasks.concat(tasksOnHost(filter,host))
end
puts("%d tasks as of: %s" % [tasks.size,Time.now.strftime("%Y-%m-%d %H:%M:%S")])
if tasks.size() == 0
puts("No " + filter + " tasks currently running.")
else
# determine table width
longestStatusWidth = 0
longestDescriptionWidth = 0
tasks.each do |t|
longestStatusWidth = [longestStatusWidth,t.status.length].max
longestDescriptionWidth = [longestDescriptionWidth,t.description.length].max
end
# set the maximum character width of each column, without padding
hostWidth = 15
startTimeWidth = 19
stateWidth = 8
descriptionWidth = [32,longestDescriptionWidth].min
statusWidth = [36,longestStatusWidth + 27].min
rowSeparator = "+" + "-" * (hostWidth + 2) +
"+" + "-" * (startTimeWidth + 2) +
"+" + "-" * (stateWidth + 2) +
"+" + "-" * (descriptionWidth + 2) +
"+" + "-" * (statusWidth + 2) + "+"
# print table header
cells = [setCellWidth("Host",hostWidth),
setCellWidth("Start Time",startTimeWidth),
setCellWidth("State",stateWidth),
setCellWidth("Description",descriptionWidth),
setCellWidth("Status",statusWidth)]
line = "| %s | %s | %s | %s | %s |" % cells
puts(rowSeparator)
puts(line)
# print table content
tasks.each do |t|
cells = [setCellWidth(t.host.hostname,hostWidth),
setCellWidth(t.starttime.strftime("%Y-%m-%d %H:%M:%S"),startTimeWidth),
setCellWidth(t.state,stateWidth),
setCellWidth(t.description,descriptionWidth),
setCellWidth("%s (since %d seconds ago)" %
[t.status,Time.now - t.statustime], statusWidth)]
line = "| %s | %s | %s | %s | %s |" % cells
puts(rowSeparator)
puts(line)
end
puts(rowSeparator)
end
end
#---------------------------------------------------------------------------------------------------
#
# Helper methods
#
# right-pad with spaces or truncate with ellipses to match passed width
def setCellWidth(cellContent,width)
numCharsTooShort = width-cellContent.length
if numCharsTooShort < 0
# cellContent is too long, so truncate
return cellContent[0,[width-3,0].max] + "." * [3,width].min
else
# cellContent is requested width or too short, so right-pad with zero or more spaces
return cellContent + " " * numCharsTooShort
end
end
end
end

View File

@ -87,6 +87,10 @@ module Shell
@hbase_admin ||= hbase.admin(formatter) @hbase_admin ||= hbase.admin(formatter)
end end
def hbase_taskmonitor
@hbase_taskmonitor ||= hbase.taskmonitor(formatter)
end
def hbase_table(name) def hbase_table(name)
hbase.table(name, self) hbase.table(name, self)
end end
@ -243,6 +247,7 @@ Shell.load_command_group(
version version
table_help table_help
whoami whoami
processlist
] ]
) )

View File

@ -54,6 +54,10 @@ module Shell
@shell.hbase_admin @shell.hbase_admin
end end
def taskmonitor
@shell.hbase_taskmonitor
end
def table(name) def table(name)
@shell.hbase_table(name) @shell.hbase_table(name)
end end

View File

@ -0,0 +1,65 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class Processlist < Command
def help
return <<-EOF
Show regionserver task list.
hbase> processlist
hbase> processlist 'all'
hbase> processlist 'general'
hbase> processlist 'handler'
hbase> processlist 'rpc'
hbase> processlist 'operation'
hbase> processlist 'all','host187.example.com'
hbase> processlist 'all','host187.example.com,16020'
hbase> processlist 'all','host187.example.com,16020,1289493121758'
EOF
end
def command(*args)
if ['all','general','handler','rpc','operation'].include? args[0]
# if the first argument is a valid filter specifier, use it as such
filter = args[0]
hosts = args[1,args.length]
else
# otherwise, treat all arguments as host addresses by default
filter = 'general'
hosts = args
end
hosts = admin.getServerNames(hosts)
if hosts == nil
puts "No regionservers available."
else
taskmonitor.tasks(filter,hosts)
end
end
end
end
end

View File

@ -43,8 +43,11 @@ public abstract class AbstractTestShell {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false); TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
//NOTE: Below Settings are disabled for taskmonitor_test
TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, -1); TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, -1);
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, -1); TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
TEST_UTIL.getConfiguration().setBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
// Security setup configuration // Security setup configuration
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
VisibilityTestUtil.enableVisiblityLabels(TEST_UTIL.getConfiguration()); VisibilityTestUtil.enableVisiblityLabels(TEST_UTIL.getConfiguration());

View File

@ -0,0 +1,38 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'hbase'
module Hbase
class TaskMonitorTest < Test::Unit::TestCase
include TestHelpers
def setup
setup_hbase
end
define_test "tasksOnHost should return tasks list" do
filter = 'all'
hosts = admin.getRegionServers()
hosts.each do |host|
tasks = taskmonitor.tasksOnHost(filter,host)
assert(tasks.length > 0)
end
end
end
end

View File

@ -60,6 +60,10 @@ module Hbase
@shell.hbase_admin @shell.hbase_admin
end end
def taskmonitor
@shell.hbase_taskmonitor
end
def security_admin def security_admin
@shell.hbase_security_admin @shell.hbase_security_admin
end end