HBASE-20165 Shell command to make a normal peer to be a serial replication peer
This commit is contained in:
parent
9342e0091d
commit
d824bace4f
|
@ -284,6 +284,15 @@ module Hbase
|
||||||
@admin.updateReplicationPeerConfig(id, rpc)
|
@admin.updateReplicationPeerConfig(id, rpc)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def set_peer_serial(id, peer_serial)
|
||||||
|
rpc = get_peer_config(id)
|
||||||
|
return if rpc.nil?
|
||||||
|
rpc_builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig
|
||||||
|
.newBuilder(rpc)
|
||||||
|
new_rpc = rpc_builder.setSerial(peer_serial).build
|
||||||
|
@admin.updateReplicationPeerConfig(id, new_rpc)
|
||||||
|
end
|
||||||
|
|
||||||
# Set exclude namespaces config for the specified peer
|
# Set exclude namespaces config for the specified peer
|
||||||
def set_peer_exclude_namespaces(id, exclude_namespaces)
|
def set_peer_exclude_namespaces(id, exclude_namespaces)
|
||||||
return if exclude_namespaces.nil?
|
return if exclude_namespaces.nil?
|
||||||
|
|
|
@ -379,6 +379,7 @@ Shell.load_command_group(
|
||||||
enable_peer
|
enable_peer
|
||||||
disable_peer
|
disable_peer
|
||||||
set_peer_replicate_all
|
set_peer_replicate_all
|
||||||
|
set_peer_serial
|
||||||
set_peer_namespaces
|
set_peer_namespaces
|
||||||
append_peer_namespaces
|
append_peer_namespaces
|
||||||
remove_peer_namespaces
|
remove_peer_namespaces
|
||||||
|
|
|
@ -39,7 +39,8 @@ EOF
|
||||||
peers = replication_admin.list_peers
|
peers = replication_admin.list_peers
|
||||||
|
|
||||||
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
|
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
|
||||||
STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH])
|
STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH
|
||||||
|
SERIAL])
|
||||||
|
|
||||||
peers.each do |peer|
|
peers.each do |peer|
|
||||||
id = peer.getPeerId
|
id = peer.getPeerId
|
||||||
|
@ -55,7 +56,7 @@ EOF
|
||||||
formatter.row([id, config.getClusterKey,
|
formatter.row([id, config.getClusterKey,
|
||||||
config.getReplicationEndpointImpl, state,
|
config.getReplicationEndpointImpl, state,
|
||||||
config.replicateAllUserTables, namespaces, tableCFs,
|
config.replicateAllUserTables, namespaces, tableCFs,
|
||||||
config.getBandwidth])
|
config.getBandwidth, config.isSerial])
|
||||||
end
|
end
|
||||||
|
|
||||||
formatter.footer
|
formatter.footer
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
#
|
||||||
|
# Copyright The Apache Software Foundation
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class SetPeerSerial < Command
|
||||||
|
def help
|
||||||
|
<<-EOF
|
||||||
|
Set the serial flag to true or false for the specified peer.
|
||||||
|
|
||||||
|
If serial flag is true, then all logs of user tables (REPLICATION_SCOPE != 0) will be
|
||||||
|
replicated to peer cluster serially, which means that each segment of log for replicated
|
||||||
|
table will be pushed to peer cluster in order of their log sequence id.
|
||||||
|
|
||||||
|
If serial flag is false, then the source cluster won't ensure that the logs of replicated
|
||||||
|
table will be pushed to peer cluster serially.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
# set serial flag to true
|
||||||
|
hbase> set_peer_serial '1', true
|
||||||
|
# set serial flag to false
|
||||||
|
hbase> set_peer_serial '1', false
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command(id, peer_serial)
|
||||||
|
replication_admin.set_peer_serial(id, peer_serial)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -459,6 +459,29 @@ module Hbase
|
||||||
replication_admin.remove_peer(@peer_id)
|
replication_admin.remove_peer(@peer_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test 'set_peer_serial' do
|
||||||
|
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
||||||
|
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
command(:add_peer, @peer_id, args)
|
||||||
|
|
||||||
|
assert_equal(1, command(:list_peers).length)
|
||||||
|
peer_config = command(:list_peers).get(0).getPeerConfig
|
||||||
|
assert_equal(false, peer_config.isSerial)
|
||||||
|
|
||||||
|
command(:set_peer_serial, @peer_id, true)
|
||||||
|
peer_config = command(:list_peers).get(0).getPeerConfig
|
||||||
|
assert_equal(true, peer_config.isSerial)
|
||||||
|
|
||||||
|
command(:set_peer_serial, @peer_id, false)
|
||||||
|
peer_config = command(:list_peers).get(0).getPeerConfig
|
||||||
|
assert_equal(false, peer_config.isSerial)
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
assert_equal(0, command(:list_peers).length)
|
||||||
|
end
|
||||||
|
|
||||||
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
|
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
|
||||||
cluster_key = "localhost:2181:/hbase-test"
|
cluster_key = "localhost:2181:/hbase-test"
|
||||||
args = { CLUSTER_KEY => cluster_key }
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
|
Loading…
Reference in New Issue