HBASE-4368 Expose processlist in shell (per regionserver and perhaps by cluster) (Talat Uyarer)
This commit is contained in:
parent
1c30ae68ec
commit
2515b0974d
|
@ -98,6 +98,7 @@ end
|
|||
# Include classes definition
|
||||
require 'hbase/hbase'
|
||||
require 'hbase/admin'
|
||||
require 'hbase/taskmonitor'
|
||||
require 'hbase/table'
|
||||
require 'hbase/quotas'
|
||||
require 'hbase/replication_admin'
|
||||
|
|
|
@ -1007,6 +1007,47 @@ module Hbase
|
|||
@admin.listTableSnapshots(tableNameRegex, snapshotNameRegex).to_a
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Returns a list of regionservers
|
||||
def getRegionServers()
|
||||
return @admin.getClusterStatus.getServers.map { |serverName| serverName }
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Returns a list of servernames
|
||||
def getServerNames(servers)
|
||||
regionservers = getRegionServers()
|
||||
servernames = []
|
||||
|
||||
if servers.length == 0
|
||||
# if no servers were specified as arguments, get a list of all servers
|
||||
servernames = regionservers
|
||||
else
|
||||
# Strings replace with ServerName objects in servers array
|
||||
i = 0
|
||||
while (i < servers.length)
|
||||
server = servers[i]
|
||||
|
||||
if ServerName.isFullServerName(server)
|
||||
servernames.push(ServerName.valueOf(server))
|
||||
else
|
||||
name_list = server.split(",")
|
||||
j = 0
|
||||
while (j < regionservers.length)
|
||||
sn = regionservers[j]
|
||||
if name_list[0] == sn.hostname and (name_list[1] == nil ? true : (name_list[1] == sn.port.to_s) )
|
||||
servernames.push(sn)
|
||||
end
|
||||
j += 1
|
||||
end
|
||||
end
|
||||
i += 1
|
||||
end
|
||||
end
|
||||
|
||||
return servernames
|
||||
end
|
||||
|
||||
# Apply config specific to a table/column to its descriptor
|
||||
def set_descriptor_config(descriptor, config)
|
||||
raise(ArgumentError, "#{CONFIGURATION} must be a Hash type") unless config.kind_of?(Hash)
|
||||
|
|
|
@ -21,6 +21,7 @@ include Java
|
|||
|
||||
require 'hbase/admin'
|
||||
require 'hbase/table'
|
||||
require 'hbase/taskmonitor'
|
||||
require 'hbase/quotas'
|
||||
require 'hbase/security'
|
||||
require 'hbase/visibility_labels'
|
||||
|
@ -51,6 +52,10 @@ module Hbase
|
|||
::Hbase::RSGroupAdmin.new(@connection, formatter)
|
||||
end
|
||||
|
||||
def taskmonitor(formatter)
|
||||
::Hbase::TaskMonitor.new(configuration, formatter)
|
||||
end
|
||||
|
||||
# Create new one each time
|
||||
def table(table, shell)
|
||||
::Hbase::Table.new(@connection.getTable(table), shell)
|
||||
|
@ -71,7 +76,7 @@ module Hbase
|
|||
def quotas_admin(formatter)
|
||||
::Hbase::QuotasAdmin.new(@connection.getAdmin, formatter)
|
||||
end
|
||||
|
||||
|
||||
def shutdown
|
||||
@connection.close
|
||||
end
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
#
|
||||
# Copyright 2010 The Apache Software Foundation
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
include Java
|
||||
|
||||
# Add the $HBASE_HOME/lib directory to the ruby load_path to load jackson
|
||||
if File.exists?(File.join(File.dirname(__FILE__), "..", "lib"))
|
||||
$LOAD_PATH.unshift File.join(File.dirname(__FILE__), "..", "lib")
|
||||
end
|
||||
|
||||
module Hbase
|
||||
class TaskMonitor
|
||||
include HBaseConstants
|
||||
|
||||
#---------------------------------------------------------------------------------------------
|
||||
# Represents information reported by a server on a single MonitoredTask
|
||||
class Task
|
||||
|
||||
def initialize(taskMap,host)
|
||||
|
||||
taskMap.each_pair do |k,v|
|
||||
case k
|
||||
when "statustimems"
|
||||
@statustime = Time.at(v/1000)
|
||||
when "status"
|
||||
@status = v
|
||||
when "starttimems"
|
||||
@starttime = Time.at(v/1000)
|
||||
when "description"
|
||||
@description = v
|
||||
when "state"
|
||||
@state = v
|
||||
end
|
||||
end
|
||||
|
||||
@host = host
|
||||
|
||||
end
|
||||
|
||||
def statustime
|
||||
# waiting IPC handlers often have statustime = -1, in this case return starttime
|
||||
if @statustime > Time.at(-1)
|
||||
return @statustime
|
||||
end
|
||||
return @starttime
|
||||
end
|
||||
|
||||
attr_reader :host
|
||||
attr_reader :status
|
||||
attr_reader :starttime
|
||||
attr_reader :description
|
||||
attr_reader :state
|
||||
|
||||
end
|
||||
|
||||
|
||||
def initialize(configuration, formatter)
|
||||
@conf = configuration
|
||||
@formatter = formatter
|
||||
@conn = org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(@conf)
|
||||
@admin = @conn.getAdmin()
|
||||
end
|
||||
|
||||
#---------------------------------------------------------------------------------------------------
|
||||
# Returns a filtered list of tasks on the given host
|
||||
def tasksOnHost(filter,host)
|
||||
|
||||
java_import 'java.net.URL'
|
||||
java_import 'org.codehaus.jackson.map.ObjectMapper'
|
||||
|
||||
infoport = @admin.getClusterStatus().getLoad(host).getInfoServerPort().to_s
|
||||
|
||||
# Note: This condition use constants from hbase-server
|
||||
#if (!@admin.getConfiguration().getBoolean(org.apache.hadoop.hbase.http.ServerConfigurationKeys::HBASE_SSL_ENABLED_KEY,
|
||||
# org.apache.hadoop.hbase.http.ServerConfigurationKeys::HBASE_SSL_ENABLED_DEFAULT))
|
||||
# schema = "http://"
|
||||
#else
|
||||
# schema = "https://"
|
||||
#end
|
||||
schema = "http://"
|
||||
url = schema + host.hostname + ":" + infoport + "/rs-status?format=json&filter=" + filter
|
||||
|
||||
json = URL.new(url)
|
||||
mapper = ObjectMapper.new
|
||||
|
||||
# read and parse JSON
|
||||
tasksArrayList = mapper.readValue(json,java.lang.Object.java_class)
|
||||
|
||||
# convert to an array of TaskMonitor::Task instances
|
||||
tasks = Array.new
|
||||
tasksArrayList.each do |t|
|
||||
tasks.unshift Task.new(t,host)
|
||||
end
|
||||
|
||||
return tasks
|
||||
|
||||
end
|
||||
|
||||
#---------------------------------------------------------------------------------------------------
|
||||
# Prints a table of filtered tasks on requested hosts
|
||||
def tasks(filter,hosts)
|
||||
|
||||
# put all tasks on all requested hosts in the same list
|
||||
tasks = []
|
||||
hosts.each do |host|
|
||||
tasks.concat(tasksOnHost(filter,host))
|
||||
end
|
||||
|
||||
puts("%d tasks as of: %s" % [tasks.size,Time.now.strftime("%Y-%m-%d %H:%M:%S")])
|
||||
|
||||
if tasks.size() == 0
|
||||
puts("No " + filter + " tasks currently running.")
|
||||
else
|
||||
|
||||
# determine table width
|
||||
longestStatusWidth = 0
|
||||
longestDescriptionWidth = 0
|
||||
tasks.each do |t|
|
||||
longestStatusWidth = [longestStatusWidth,t.status.length].max
|
||||
longestDescriptionWidth = [longestDescriptionWidth,t.description.length].max
|
||||
end
|
||||
|
||||
# set the maximum character width of each column, without padding
|
||||
hostWidth = 15
|
||||
startTimeWidth = 19
|
||||
stateWidth = 8
|
||||
descriptionWidth = [32,longestDescriptionWidth].min
|
||||
statusWidth = [36,longestStatusWidth + 27].min
|
||||
|
||||
rowSeparator = "+" + "-" * (hostWidth + 2) +
|
||||
"+" + "-" * (startTimeWidth + 2) +
|
||||
"+" + "-" * (stateWidth + 2) +
|
||||
"+" + "-" * (descriptionWidth + 2) +
|
||||
"+" + "-" * (statusWidth + 2) + "+"
|
||||
|
||||
# print table header
|
||||
cells = [setCellWidth("Host",hostWidth),
|
||||
setCellWidth("Start Time",startTimeWidth),
|
||||
setCellWidth("State",stateWidth),
|
||||
setCellWidth("Description",descriptionWidth),
|
||||
setCellWidth("Status",statusWidth)]
|
||||
|
||||
line = "| %s | %s | %s | %s | %s |" % cells
|
||||
|
||||
puts(rowSeparator)
|
||||
puts(line)
|
||||
|
||||
# print table content
|
||||
tasks.each do |t|
|
||||
|
||||
cells = [setCellWidth(t.host.hostname,hostWidth),
|
||||
setCellWidth(t.starttime.strftime("%Y-%m-%d %H:%M:%S"),startTimeWidth),
|
||||
setCellWidth(t.state,stateWidth),
|
||||
setCellWidth(t.description,descriptionWidth),
|
||||
setCellWidth("%s (since %d seconds ago)" %
|
||||
[t.status,Time.now - t.statustime], statusWidth)]
|
||||
|
||||
line = "| %s | %s | %s | %s | %s |" % cells
|
||||
|
||||
puts(rowSeparator)
|
||||
puts(line)
|
||||
|
||||
end
|
||||
puts(rowSeparator)
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
#---------------------------------------------------------------------------------------------------
|
||||
#
|
||||
# Helper methods
|
||||
#
|
||||
|
||||
# right-pad with spaces or truncate with ellipses to match passed width
|
||||
def setCellWidth(cellContent,width)
|
||||
numCharsTooShort = width-cellContent.length
|
||||
if numCharsTooShort < 0
|
||||
# cellContent is too long, so truncate
|
||||
return cellContent[0,[width-3,0].max] + "." * [3,width].min
|
||||
else
|
||||
# cellContent is requested width or too short, so right-pad with zero or more spaces
|
||||
return cellContent + " " * numCharsTooShort
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
|
@ -87,6 +87,10 @@ module Shell
|
|||
@hbase_admin ||= hbase.admin(formatter)
|
||||
end
|
||||
|
||||
def hbase_taskmonitor
|
||||
@hbase_taskmonitor ||= hbase.taskmonitor(formatter)
|
||||
end
|
||||
|
||||
def hbase_table(name)
|
||||
hbase.table(name, self)
|
||||
end
|
||||
|
@ -247,6 +251,7 @@ Shell.load_command_group(
|
|||
version
|
||||
table_help
|
||||
whoami
|
||||
processlist
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -54,6 +54,10 @@ module Shell
|
|||
@shell.hbase_admin
|
||||
end
|
||||
|
||||
def taskmonitor
|
||||
@shell.hbase_taskmonitor
|
||||
end
|
||||
|
||||
def table(name)
|
||||
@shell.hbase_table(name)
|
||||
end
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
#
|
||||
# Copyright 2010 The Apache Software Foundation
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class Processlist < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Show regionserver task list.
|
||||
|
||||
hbase> processlist
|
||||
hbase> processlist 'all'
|
||||
hbase> processlist 'general'
|
||||
hbase> processlist 'handler'
|
||||
hbase> processlist 'rpc'
|
||||
hbase> processlist 'operation'
|
||||
hbase> processlist 'all','host187.example.com'
|
||||
hbase> processlist 'all','host187.example.com,16020'
|
||||
hbase> processlist 'all','host187.example.com,16020,1289493121758'
|
||||
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(*args)
|
||||
|
||||
if ['all','general','handler','rpc','operation'].include? args[0]
|
||||
# if the first argument is a valid filter specifier, use it as such
|
||||
filter = args[0]
|
||||
hosts = args[1,args.length]
|
||||
else
|
||||
# otherwise, treat all arguments as host addresses by default
|
||||
filter = 'general'
|
||||
hosts = args
|
||||
end
|
||||
|
||||
hosts = admin.getServerNames(hosts)
|
||||
|
||||
if hosts == nil
|
||||
puts "No regionservers available."
|
||||
else
|
||||
taskmonitor.tasks(filter,hosts)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
|
@ -43,8 +43,11 @@ public abstract class AbstractTestShell {
|
|||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
TEST_UTIL.getConfiguration().setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, false);
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
|
||||
//NOTE: Below Settings are disabled for taskmonitor_test
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, -1);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_INFO_PORT, 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
|
||||
// Security setup configuration
|
||||
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
|
||||
VisibilityTestUtil.enableVisiblityLabels(TEST_UTIL.getConfiguration());
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
require 'hbase'
|
||||
|
||||
module Hbase
|
||||
class TaskMonitorTest < Test::Unit::TestCase
|
||||
include TestHelpers
|
||||
def setup
|
||||
setup_hbase
|
||||
end
|
||||
|
||||
define_test "tasksOnHost should return tasks list" do
|
||||
filter = 'all'
|
||||
hosts = admin.getRegionServers()
|
||||
hosts.each do |host|
|
||||
tasks = taskmonitor.tasksOnHost(filter,host)
|
||||
assert(tasks.length > 0)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -60,6 +60,10 @@ module Hbase
|
|||
@shell.hbase_admin
|
||||
end
|
||||
|
||||
def taskmonitor
|
||||
@shell.hbase_taskmonitor
|
||||
end
|
||||
|
||||
def security_admin
|
||||
@shell.hbase_security_admin
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue