HBASE-25694 Improve the shell's 'status replication' command output (#4272)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
xicm 2022-04-28 23:10:46 +08:00 committed by GitHub
parent cdf81ea5cc
commit ec78c2677b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 26 deletions

View File

@ -961,24 +961,26 @@ module Hbase
servers: cluster_metrics.getLiveServerMetrics.size)) servers: cluster_metrics.getLiveServerMetrics.size))
cluster_metrics.getLiveServerMetrics.keySet.each do |server_name| cluster_metrics.getLiveServerMetrics.keySet.each do |server_name|
sl = cluster_metrics.getLiveServerMetrics.get(server_name) sl = cluster_metrics.getLiveServerMetrics.get(server_name)
r_sink_string = ' SINK:' r_sink_string = ' SINK:'
r_source_string = ' SOURCE:' r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil? next if r_load_sink.nil?
if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted() if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
# If we have applied no operations since we've started replication, # If we have applied no operations since we've started replication,
# assume that we're not acting as a sink and don't print the normal information # assume that we're not acting as a sink and don't print the normal information
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s r_sink_string << "\n TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", Waiting for OPs... " r_sink_string << ",\n Waiting for OPs... "
else else
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s r_sink_string << "\n TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s r_sink_string << ",\n AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ", TimeStampsOfLastAppliedOp=" + r_sink_string << ",\n TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString() r_load_sink.getTimestampsOfLastAppliedOp().to_s
end end
r_load_source_map = sl.getReplicationLoadSourceMap r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string) build_source_string(r_load_source_map, r_source_string)
puts(format(' %<host>s:', host: server_name.getHostname)) puts(format(' %<host>s:', host: server_name.getHostname))
if type.casecmp('SOURCE').zero? if type.casecmp('SOURCE').zero?
puts(format('%<source>s', source: r_source_string)) puts(format('%<source>s', source: r_source_string))
@ -1048,19 +1050,20 @@ module Hbase
def build_source_string(r_load_source_map, r_source_string) def build_source_string(r_load_source_map, r_source_string)
r_load_source_map.each do |peer, sources| r_load_source_map.each do |peer, sources|
r_source_string << ' PeerID=' + peer r_source_string << "\n PeerID=" + peer
sources.each do |source_load| sources.each do |source_load|
build_queue_title(source_load, r_source_string) build_queue_title(source_load, r_source_string)
build_running_source_stats(source_load, r_source_string) build_running_source_stats(source_load, r_source_string)
r_source_string << "\n"
end end
end end
end end
def build_queue_title(source_load, r_source_string) def build_queue_title(source_load, r_source_string)
r_source_string << if source_load.isRecovered r_source_string << if source_load.isRecovered
"\n Recovered Queue: " ",\n Queue(Recovered)="
else else
"\n Normal Queue: " ",\n Queue(Normal)="
end end
r_source_string << source_load.getQueueId r_source_string << source_load.getQueueId
end end
@ -1069,45 +1072,43 @@ module Hbase
if source_load.isRunning if source_load.isRunning
build_shipped_stats(source_load, r_source_string) build_shipped_stats(source_load, r_source_string)
build_load_general_stats(source_load, r_source_string) build_load_general_stats(source_load, r_source_string)
r_source_string << ', Replication Lag=' + r_source_string << ",\n ReplicationLag=" +
source_load.getReplicationLag.to_s source_load.getReplicationLag.to_s
else else
r_source_string << "\n " r_source_string << ",\n IsRunning=false, "
r_source_string << 'No Reader/Shipper threads runnning yet.' r_source_string << 'No Reader/Shipper threads runnning yet.'
end end
end end
def build_shipped_stats(source_load, r_source_string) def build_shipped_stats(source_load, r_source_string)
r_source_string << if source_load.getTimestampOfLastShippedOp.zero? r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
"\n " \ ",\n TimeStampOfLastShippedOp=0, " \
'No Ops shipped since last restart' 'No Ops shipped since last restart'
else else
"\n AgeOfLastShippedOp=" + ",\n AgeOfLastShippedOp=" +
source_load.getAgeOfLastShippedOp.to_s + source_load.getAgeOfLastShippedOp.to_s +
', TimeStampOfLastShippedOp=' + ",\n TimeStampOfLastShippedOp=" +
java.util.Date.new(source_load source_load.getTimestampOfLastShippedOp.to_s
.getTimestampOfLastShippedOp).toString
end end
end end
def build_load_general_stats(source_load, r_source_string) def build_load_general_stats(source_load, r_source_string)
r_source_string << ', SizeOfLogQueue=' + r_source_string << ",\n SizeOfLogQueue=" +
source_load.getSizeOfLogQueue.to_s source_load.getSizeOfLogQueue.to_s
r_source_string << ', EditsReadFromLogQueue=' + r_source_string << ",\n EditsReadFromLogQueue=" +
source_load.getEditsRead.to_s source_load.getEditsRead.to_s
r_source_string << ', OpsShippedToTarget=' + r_source_string << ",\n OpsShippedToTarget=" +
source_load.getOPsShipped.to_s source_load.getOPsShipped.to_s
build_edits_for_source(source_load, r_source_string) build_edits_for_source(source_load, r_source_string)
end end
def build_edits_for_source(source_load, r_source_string) def build_edits_for_source(source_load, r_source_string)
if source_load.hasEditsSinceRestart if source_load.hasEditsSinceRestart
r_source_string << ', TimeStampOfNextToReplicate=' + r_source_string << ",\n TimeStampOfNextToReplicate=" +
java.util.Date.new(source_load source_load.getTimeStampOfNextToReplicate.to_s
.getTimeStampOfNextToReplicate).toString
else else
r_source_string << ', No edits for this source' r_source_string << ",\n HasEditsSinceRestart=false, "
r_source_string << ' since it started' r_source_string << 'No edits for this source since it started'
end end
end end

View File

@ -30,14 +30,25 @@ module Hbase
include HBaseConstants include HBaseConstants
def setup def setup
@peer_id1 = '1'
@peer_id2 = '2'
@dummy_endpoint = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint'
setup_hbase setup_hbase
# Create test table if it does not exist # Create test table if it does not exist
@test_name = 'hbase_shell_admin2_test_table' @test_name = 'hbase_shell_admin2_test_table'
drop_test_table(@test_name) drop_test_table(@test_name)
create_test_table(@test_name) create_test_table(@test_name)
cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id1, args)
command(:add_peer, @peer_id2, args)
end end
def teardown def teardown
command(:remove_peer, @peer_id1)
command(:remove_peer, @peer_id2)
shutdown shutdown
end end