HBASE-17973 Expand list_regions to filter on data locality

This commit is contained in:
Josh Elser 2017-04-28 12:04:26 -04:00
parent 94c14ad0f6
commit 13b6fdf8ad
3 changed files with 113 additions and 13 deletions

View File

@ -81,6 +81,8 @@ module HBaseConstants
NAMESPACES = 'NAMESPACES' NAMESPACES = 'NAMESPACES'
CONFIG = 'CONFIG' CONFIG = 'CONFIG'
DATA = 'DATA' DATA = 'DATA'
SERVER_NAME = 'SERVER_NAME'
LOCALITY_THRESHOLD = 'LOCALITY_THRESHOLD'
# Load constants from hbase java API # Load constants from hbase java API
def self.promote_constants(constants) def self.promote_constants(constants)

View File

@ -23,37 +23,62 @@ module Shell
def help def help
return<<EOF return<<EOF
List all regions for a particular table as an array and also filter them by server name (optional) as prefix. List all regions for a particular table as an array and also filter them by server name (optional) as prefix
By default, it will return all the regions for the table and maximum locality (optional). By default, it will return all the regions for the table with any locality.
Examples: Examples:
hbase> list_regions 'table_name' hbase> list_regions 'table_name'
hbase> list_regions 'table_name', 'server_name' hbase> list_regions 'table_name', 'server_name'
hbase> list_regions 'table_name', {SERVER_NAME => 'server_name', LOCALITY_THRESHOLD => 0.8}
EOF EOF
return return
end end
def command(table_name, region_server_name = "") def command(table_name, options = nil)
if options.nil?
options = {}
elsif not options.is_a? Hash
# When options isn't a hash, assume it's the server name
# and create the hash internally
options = {SERVER_NAME => options}
end
admin_instance = admin.instance_variable_get("@admin") admin_instance = admin.instance_variable_get("@admin")
conn_instance = admin_instance.getConnection() conn_instance = admin_instance.getConnection()
cluster_status = admin_instance.getClusterStatus() cluster_status = admin_instance.getClusterStatus()
hregion_locator_instance = conn_instance.getRegionLocator(TableName.valueOf(table_name)) hregion_locator_instance = conn_instance.getRegionLocator(TableName.valueOf(table_name))
hregion_locator_list = hregion_locator_instance.getAllRegionLocations() hregion_locator_list = hregion_locator_instance.getAllRegionLocations().to_a
results = Array.new results = Array.new
begin begin
hregion_locator_list.each do |hregion| # Filter out region servers which we don't want, default to all RS
regions = hregion_locator_list.filter do |hregion|
server_name = options[SERVER_NAME] || '*'
accept_server_name? server_name, hregion.getServerName().toString()
end
# A locality threshold of "1.0" would be all regions (cannot have greater than 1 locality)
# Regions which have a `dataLocality` less-than-or-equal to this value are accepted
locality_threshold = 1.0
if options.has_key? LOCALITY_THRESHOLD
value = options[LOCALITY_THRESHOLD]
# Value validation. Must be a Float, and must be between [0, 1.0]
raise "#{LOCALITY_THRESHOLD} must be a float value" unless value.is_a? Float
raise "#{LOCALITY_THRESHOLD} must be between 0 and 1.0, inclusive" unless valid_locality_threshold? value
locality_threshold = value
end
regions.each do |hregion|
hregion_info = hregion.getRegionInfo() hregion_info = hregion.getRegionInfo()
server_name = hregion.getServerName() server_name = hregion.getServerName()
if hregion.getServerName().toString.start_with? region_server_name region_load_map = cluster_status.getLoad(server_name).getRegionsLoad()
startKey = Bytes.toString(hregion.getRegionInfo().getStartKey()) region_load = region_load_map.get(hregion_info.getRegionName())
endKey = Bytes.toString(hregion.getRegionInfo().getEndKey()) # Ignore regions which exceed our locality threshold
region_load_map = cluster_status.getLoad(server_name).getRegionsLoad() if accept_region_for_locality? region_load.getDataLocality(), locality_threshold
region_load = region_load_map.get(hregion_info.getRegionName()) startKey = Bytes.toString(hregion_info.getStartKey())
endKey = Bytes.toString(hregion_info.getEndKey())
region_store_file_size = region_load.getStorefileSizeMB() region_store_file_size = region_load.getStorefileSizeMB()
region_requests = region_load.getRequestsCount() region_requests = region_load.getRequestsCount()
results << { "server" => hregion.getServerName().toString(), "name" => hregion_info.getRegionNameAsString(), "startkey" => startKey, "endkey" => endKey, "size" => region_store_file_size, "requests" => region_requests } results << { "server" => hregion.getServerName().toString(), "name" => hregion_info.getRegionNameAsString(), "startkey" => startKey, "endkey" => endKey,
"size" => region_store_file_size, "requests" => region_requests, "locality" => region_load.getDataLocality() }
end end
end end
ensure ensure
@ -62,15 +87,27 @@ EOF
@end_time = Time.now @end_time = Time.now
printf("%-60s | %-60s | %-15s | %-15s | %-20s | %-20s", "SERVER_NAME", "REGION_NAME", "START_KEY", "END_KEY", "SIZE", "REQ"); printf("%-60s | %-60s | %-15s | %-15s | %-20s | %-20s | %-20s", "SERVER_NAME", "REGION_NAME", "START_KEY", "END_KEY", "SIZE", "REQ", "LOCALITY");
printf("\n") printf("\n")
for result in results for result in results
printf("%-60s | %-60s | %-15s | %-15s | %-20s | %-20s", result["server"], result["name"], result["startkey"], result["endkey"], result["size"], result["requests"]); printf("%-60s | %-60s | %-15s | %-15s | %-20s | %-20s | %-20s", result["server"], result["name"], result["startkey"], result["endkey"], result["size"], result["requests"], result['locality']);
printf("\n") printf("\n")
end end
printf("%d rows", results.size) printf("%d rows", results.size)
end end
def valid_locality_threshold?(value)
value >= 0 and value <= 1.0
end
def accept_server_name?(desired_server_name, actual_server_name)
desired_server_name.eql? '*' or actual_server_name.start_with? desired_server_name
end
def accept_region_for_locality?(actual_locality, locality_threshold)
actual_locality <= locality_threshold
end
end end
end end
end end

View File

@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'shell'
require 'hbase_constants'
include HBaseConstants
module Hbase
class NoClusterListRegionsTest < Test::Unit::TestCase
include TestHelpers
define_test 'valid_locality_values' do
command = ::Shell::Commands::ListRegions.new(nil)
# Validation that a float is received is done elsewhere
assert command.valid_locality_threshold?(0.999)
assert command.valid_locality_threshold?(0.001)
assert command.valid_locality_threshold?(1.0)
assert command.valid_locality_threshold?(0.0)
assert_equal false, command.valid_locality_threshold?(2.0)
assert_equal false, command.valid_locality_threshold?(100.0)
end
define_test 'acceptable_server_names' do
command = ::Shell::Commands::ListRegions.new(nil)
assert command.accept_server_name?('host.domain.com', 'host.domain.com')
assert command.accept_server_name?('host.domain', 'host.domain.com')
assert command.accept_server_name?('host.dom', 'host.domain.com')
assert command.accept_server_name?('host1', 'host1.domain.com')
assert_equal false, command.accept_server_name?('host2', 'host1.domain.com')
assert_equal false, command.accept_server_name?('host2.domain', 'host1.domain.com')
assert_equal false, command.accept_server_name?('host1.niamod', 'host1.domain.com')
end
define_test 'valid_region_localities' do
command = ::Shell::Commands::ListRegions.new(nil)
assert command.accept_region_for_locality?(0.5, 0.8)
assert command.accept_region_for_locality?(0.8, 0.8)
assert command.accept_region_for_locality?(0.0, 1.0)
assert command.accept_region_for_locality?(1.0, 1.0)
assert_equal false, command.accept_region_for_locality?(0.01, 0.001)
assert_equal false, command.accept_region_for_locality?(1.0, 0.8)
assert_equal false, command.accept_region_for_locality?(1.0, 0.999)
assert_equal false, command.accept_region_for_locality?(0.5, 0.3)
end
end
end