HBASE-12867 Add ability to specify custom replication endpoint to add_peer
Conflicts: hbase-shell/src/main/ruby/hbase.rb
This commit is contained in:
parent
f5b40200db
commit
4797b025fb
|
@ -66,6 +66,11 @@ module HBaseConstants
|
||||||
AUTHORIZATIONS = "AUTHORIZATIONS"
|
AUTHORIZATIONS = "AUTHORIZATIONS"
|
||||||
SKIP_FLUSH = 'SKIP_FLUSH'
|
SKIP_FLUSH = 'SKIP_FLUSH'
|
||||||
CONSISTENCY = "CONSISTENCY"
|
CONSISTENCY = "CONSISTENCY"
|
||||||
|
ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'
|
||||||
|
CLUSTER_KEY = 'CLUSTER_KEY'
|
||||||
|
TABLE_CFS = 'TABLE_CFS'
|
||||||
|
CONFIG = 'CONFIG'
|
||||||
|
DATA = 'DATA'
|
||||||
|
|
||||||
# Load constants from hbase java API
|
# Load constants from hbase java API
|
||||||
def self.promote_constants(constants)
|
def self.promote_constants(constants)
|
||||||
|
|
|
@ -19,21 +19,80 @@
|
||||||
|
|
||||||
include Java
|
include Java
|
||||||
|
|
||||||
# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
|
java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
|
||||||
|
java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
|
||||||
|
java_import org.apache.hadoop.hbase.util.Bytes
|
||||||
|
java_import org.apache.hadoop.hbase.zookeeper.ZKUtil
|
||||||
|
|
||||||
|
# Wrapper for org.apache.hadoop.hbase.client.replication.ReplicationAdmin
|
||||||
|
|
||||||
module Hbase
|
module Hbase
|
||||||
class RepAdmin
|
class RepAdmin
|
||||||
include HBaseConstants
|
include HBaseConstants
|
||||||
|
|
||||||
def initialize(configuration, formatter)
|
def initialize(configuration, formatter)
|
||||||
@replication_admin = org.apache.hadoop.hbase.client.replication.ReplicationAdmin.new(configuration)
|
@replication_admin = ReplicationAdmin.new(configuration)
|
||||||
|
@configuration = configuration
|
||||||
@formatter = formatter
|
@formatter = formatter
|
||||||
end
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Add a new peer cluster to replicate to
|
# Add a new peer cluster to replicate to
|
||||||
def add_peer(id, cluster_key, peer_tableCFs = nil)
|
def add_peer(id, args = {}, peer_tableCFs = nil)
|
||||||
@replication_admin.addPeer(id, cluster_key, peer_tableCFs)
|
# make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs
|
||||||
|
if args.is_a?(String)
|
||||||
|
cluster_key = args
|
||||||
|
@replication_admin.addPeer(id, cluster_key, peer_tableCFs)
|
||||||
|
elsif args.is_a?(Hash)
|
||||||
|
unless peer_tableCFs.nil?
|
||||||
|
raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS in args")
|
||||||
|
end
|
||||||
|
|
||||||
|
endpoint_classname = args.fetch(ENDPOINT_CLASSNAME, nil)
|
||||||
|
cluster_key = args.fetch(CLUSTER_KEY, nil)
|
||||||
|
|
||||||
|
# Handle cases where custom replication endpoint and cluster key are either both provided
|
||||||
|
# or neither are provided
|
||||||
|
if endpoint_classname.nil? and cluster_key.nil?
|
||||||
|
raise(ArgumentError, "Either ENDPOINT_CLASSNAME or CLUSTER_KEY must be specified.")
|
||||||
|
elsif !endpoint_classname.nil? and !cluster_key.nil?
|
||||||
|
raise(ArgumentError, "ENDPOINT_CLASSNAME and CLUSTER_KEY cannot both be specified.")
|
||||||
|
end
|
||||||
|
|
||||||
|
# Cluster Key is required for ReplicationPeerConfig for a custom replication endpoint
|
||||||
|
if !endpoint_classname.nil? and cluster_key.nil?
|
||||||
|
cluster_key = ZKUtil.getZooKeeperClusterKey(@configuration)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Optional parameters
|
||||||
|
config = args.fetch(CONFIG, nil)
|
||||||
|
data = args.fetch(DATA, nil)
|
||||||
|
table_cfs = args.fetch(TABLE_CFS, nil)
|
||||||
|
|
||||||
|
# Create and populate a ReplicationPeerConfig
|
||||||
|
replication_peer_config = ReplicationPeerConfig.new
|
||||||
|
replication_peer_config.set_cluster_key(cluster_key)
|
||||||
|
|
||||||
|
unless endpoint_classname.nil?
|
||||||
|
replication_peer_config.set_replication_endpoint_impl(endpoint_classname)
|
||||||
|
end
|
||||||
|
|
||||||
|
unless config.nil?
|
||||||
|
replication_peer_config.get_configuration.put_all(config)
|
||||||
|
end
|
||||||
|
|
||||||
|
unless data.nil?
|
||||||
|
# Convert Strings to Bytes for peer_data
|
||||||
|
peer_data = replication_peer_config.get_peer_data
|
||||||
|
data.each{|key, val|
|
||||||
|
peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val))
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
@replication_admin.add_peer(id, replication_peer_config, table_cfs)
|
||||||
|
else
|
||||||
|
raise(ArgumentError, "args must be either a String or Hash")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
|
@ -48,7 +107,7 @@ module Hbase
|
||||||
def list_replicated_tables(regex = ".*")
|
def list_replicated_tables(regex = ".*")
|
||||||
pattern = java.util.regex.Pattern.compile(regex)
|
pattern = java.util.regex.Pattern.compile(regex)
|
||||||
list = @replication_admin.listReplicated()
|
list = @replication_admin.listReplicated()
|
||||||
list.select {|s| pattern.match(s.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME))}
|
list.select {|s| pattern.match(s.get(ReplicationAdmin.TNAME))}
|
||||||
end
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
|
|
|
@ -22,21 +22,47 @@ module Shell
|
||||||
class AddPeer< Command
|
class AddPeer< Command
|
||||||
def help
|
def help
|
||||||
return <<-EOF
|
return <<-EOF
|
||||||
Add a peer cluster to replicate to, the id must be a short and
|
A peer can either be another HBase cluster or a custom replication endpoint. In either case an id
|
||||||
the cluster key is composed like this:
|
must be specified to identify the peer.
|
||||||
|
|
||||||
|
For a HBase cluster peer, a cluster key must be provided and is composed like this:
|
||||||
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
|
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
|
||||||
This gives a full path for HBase to connect to another cluster.
|
This gives a full path for HBase to connect to another HBase cluster. An optional parameter for
|
||||||
|
table column families identifies which column families will be replicated to the peer cluster.
|
||||||
Examples:
|
Examples:
|
||||||
|
|
||||||
hbase> add_peer '1', "server1.cie.com:2181:/hbase"
|
hbase> add_peer '1', "server1.cie.com:2181:/hbase"
|
||||||
hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
|
hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
|
||||||
hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "tab1; tab2:cf1; tab3:cf2,cf3"
|
hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "table1; table2:cf1; table3:cf1,cf2"
|
||||||
|
hbase> add_peer '4', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
|
||||||
|
hbase> add_peer '5', CLUSTER_KEY => "server1.cie.com:2181:/hbase",
|
||||||
|
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||||
|
|
||||||
|
For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
|
||||||
|
are DATA and CONFIG which can be specified to set different either the peer_data or configuration
|
||||||
|
for the custom replication endpoint. Table column families is optional and can be specified with
|
||||||
|
the key TABLE_CFS.
|
||||||
|
|
||||||
|
hbase> add_peer '6', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint'
|
||||||
|
hbase> add_peer '7', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
|
||||||
|
DATA => { "key1" => 1 }
|
||||||
|
hbase> add_peer '8', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
|
||||||
|
CONFIG => { "config1" => "value1", "config2" => "value2" }
|
||||||
|
hbase> add_peer '9', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
|
||||||
|
DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
|
||||||
|
hbase> add_peer '10', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
|
||||||
|
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||||
|
hbase> add_peer '11', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint',
|
||||||
|
DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
|
||||||
|
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||||
|
|
||||||
|
Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified but not both.
|
||||||
EOF
|
EOF
|
||||||
end
|
end
|
||||||
|
|
||||||
def command(id, cluster_key, peer_tableCFs = nil)
|
def command(id, args = {}, peer_tableCFs = nil)
|
||||||
format_simple_command do
|
format_simple_command do
|
||||||
replication_admin.add_peer(id, cluster_key, peer_tableCFs)
|
replication_admin.add_peer(id, args, peer_tableCFs)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,191 @@
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
require 'shell'
|
||||||
|
require 'shell/formatter'
|
||||||
|
require 'hbase'
|
||||||
|
require 'hbase/hbase'
|
||||||
|
require 'hbase/table'
|
||||||
|
|
||||||
|
include HBaseConstants
|
||||||
|
|
||||||
|
module Hbase
|
||||||
|
class ReplicationAdminTest < Test::Unit::TestCase
|
||||||
|
include TestHelpers
|
||||||
|
|
||||||
|
def setup
|
||||||
|
@test_name = "hbase_shell_tests_table"
|
||||||
|
@peer_id = '1'
|
||||||
|
|
||||||
|
setup_hbase
|
||||||
|
drop_test_table(@test_name)
|
||||||
|
create_test_table(@test_name)
|
||||||
|
|
||||||
|
assert_equal(0, replication_admin.list_peers.length)
|
||||||
|
end
|
||||||
|
|
||||||
|
def teardown
|
||||||
|
assert_equal(0, replication_admin.list_peers.length)
|
||||||
|
|
||||||
|
shutdown
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: should fail when args isn't specified" do
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
replication_admin.add_peer(@peer_id, nil)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: fail when neither CLUSTER_KEY nor ENDPOINT_CLASSNAME are specified" do
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
args = {}
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: fail when both CLUSTER_KEY and ENDPOINT_CLASSNAME are specified" do
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
args = { CLUSTER_KEY => 'zk1,zk2,zk3:2182:/hbase-prod',
|
||||||
|
ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint' }
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: args must be a string or number" do
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
replication_admin.add_peer(@peer_id, 1)
|
||||||
|
end
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
replication_admin.add_peer(@peer_id, ['test'])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: single zk cluster key" do
|
||||||
|
cluster_key = "server1.cie.com:2181:/hbase"
|
||||||
|
|
||||||
|
replication_admin.add_peer(@peer_id, cluster_key)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: multiple zk cluster key" do
|
||||||
|
cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
|
||||||
|
|
||||||
|
replication_admin.add_peer(@peer_id, cluster_key)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key)
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: multiple zk cluster key and table_cfs" do
|
||||||
|
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||||
|
table_cfs_str = "table1;table2:cf1;table3:cf2,cf3"
|
||||||
|
|
||||||
|
replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
|
||||||
|
assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: single zk cluster key - peer config" do
|
||||||
|
cluster_key = "server1.cie.com:2181:/hbase"
|
||||||
|
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: multiple zk cluster key - peer config" do
|
||||||
|
cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
|
||||||
|
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
|
||||||
|
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||||
|
table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||||
|
table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
|
||||||
|
|
||||||
|
args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs }
|
||||||
|
replication_admin.add_peer(@peer_id, args)
|
||||||
|
|
||||||
|
assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
assert(replication_admin.list_peers.key?(@peer_id))
|
||||||
|
assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id))
|
||||||
|
assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do
|
||||||
|
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||||
|
table_cfs_str = "table1;table2:cf1;table3:cf1,cf2"
|
||||||
|
|
||||||
|
assert_raise(ArgumentError) do
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
replication_admin.add_peer(@peer_id, args, table_cfs_str)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
|
||||||
|
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
|
||||||
|
# define_test "add_peer: adding a second peer with same id should error" do
|
||||||
|
# replication_admin.add_peer(@peer_id, '')
|
||||||
|
# assert_equal(1, replication_admin.list_peers.length)
|
||||||
|
#
|
||||||
|
# assert_raise(java.lang.IllegalArgumentException) do
|
||||||
|
# replication_admin.add_peer(@peer_id, '')
|
||||||
|
# end
|
||||||
|
#
|
||||||
|
# assert_equal(1, replication_admin.list_peers.length, 1)
|
||||||
|
#
|
||||||
|
# # cleanup for future tests
|
||||||
|
# replication_admin.remove_peer(@peer_id)
|
||||||
|
# end
|
||||||
|
end
|
||||||
|
end
|
|
@ -68,6 +68,10 @@ module Hbase
|
||||||
@shell.hbase_visibility_labels_admin
|
@shell.hbase_visibility_labels_admin
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def replication_admin
|
||||||
|
@shell.hbase_replication_admin
|
||||||
|
end
|
||||||
|
|
||||||
def create_test_table(name)
|
def create_test_table(name)
|
||||||
# Create the table if needed
|
# Create the table if needed
|
||||||
unless admin.exists?(name)
|
unless admin.exists?(name)
|
||||||
|
|
Loading…
Reference in New Issue