diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index dd9ed906be9..d5fb5f6e6bd 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -66,6 +66,11 @@ module HBaseConstants AUTHORIZATIONS = "AUTHORIZATIONS" SKIP_FLUSH = 'SKIP_FLUSH' CONSISTENCY = "CONSISTENCY" + ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME' + CLUSTER_KEY = 'CLUSTER_KEY' + TABLE_CFS = 'TABLE_CFS' + CONFIG = 'CONFIG' + DATA = 'DATA' # Load constants from hbase java API def self.promote_constants(constants) diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 6dedb2effc7..2d0845f5625 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -19,21 +19,80 @@ include Java -# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin +java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin +java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig +java_import org.apache.hadoop.hbase.util.Bytes +java_import org.apache.hadoop.hbase.zookeeper.ZKUtil + +# Wrapper for org.apache.hadoop.hbase.client.replication.ReplicationAdmin module Hbase class RepAdmin include HBaseConstants def initialize(configuration, formatter) - @replication_admin = org.apache.hadoop.hbase.client.replication.ReplicationAdmin.new(configuration) + @replication_admin = ReplicationAdmin.new(configuration) + @configuration = configuration @formatter = formatter end #---------------------------------------------------------------------------------------------- # Add a new peer cluster to replicate to - def add_peer(id, cluster_key, peer_tableCFs = nil) - @replication_admin.addPeer(id, cluster_key, peer_tableCFs) + def add_peer(id, args = {}, peer_tableCFs = nil) + # make add_peer backwards compatible to take in string for clusterKey and peer_tableCFs + if args.is_a?(String) + cluster_key = args + @replication_admin.addPeer(id, cluster_key, peer_tableCFs) + elsif args.is_a?(Hash) + unless peer_tableCFs.nil? + raise(ArgumentError, "peer_tableCFs should be specified as TABLE_CFS in args") + end + + endpoint_classname = args.fetch(ENDPOINT_CLASSNAME, nil) + cluster_key = args.fetch(CLUSTER_KEY, nil) + + # Handle cases where custom replication endpoint and cluster key are either both provided + # or neither are provided + if endpoint_classname.nil? and cluster_key.nil? + raise(ArgumentError, "Either ENDPOINT_CLASSNAME or CLUSTER_KEY must be specified.") + elsif !endpoint_classname.nil? and !cluster_key.nil? + raise(ArgumentError, "ENDPOINT_CLASSNAME and CLUSTER_KEY cannot both be specified.") + end + + # Cluster Key is required for ReplicationPeerConfig for a custom replication endpoint + if !endpoint_classname.nil? and cluster_key.nil? + cluster_key = ZKUtil.getZooKeeperClusterKey(@configuration) + end + + # Optional parameters + config = args.fetch(CONFIG, nil) + data = args.fetch(DATA, nil) + table_cfs = args.fetch(TABLE_CFS, nil) + + # Create and populate a ReplicationPeerConfig + replication_peer_config = ReplicationPeerConfig.new + replication_peer_config.set_cluster_key(cluster_key) + + unless endpoint_classname.nil? + replication_peer_config.set_replication_endpoint_impl(endpoint_classname) + end + + unless config.nil? + replication_peer_config.get_configuration.put_all(config) + end + + unless data.nil? + # Convert Strings to Bytes for peer_data + peer_data = replication_peer_config.get_peer_data + data.each{|key, val| + peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val)) + } + end + + @replication_admin.add_peer(id, replication_peer_config, table_cfs) + else + raise(ArgumentError, "args must be either a String or Hash") + end end #---------------------------------------------------------------------------------------------- @@ -48,7 +107,7 @@ module Hbase def list_replicated_tables(regex = ".*") pattern = java.util.regex.Pattern.compile(regex) list = @replication_admin.listReplicated() - list.select {|s| pattern.match(s.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME))} + list.select {|s| pattern.match(s.get(ReplicationAdmin.TNAME))} end #---------------------------------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index ecd8e753920..be010416445 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -22,21 +22,47 @@ module Shell class AddPeer< Command def help return <<-EOF -Add a peer cluster to replicate to, the id must be a short and -the cluster key is composed like this: +A peer can either be another HBase cluster or a custom replication endpoint. In either case an id +must be specified to identify the peer. + +For a HBase cluster peer, a cluster key must be provided and is composed like this: hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent -This gives a full path for HBase to connect to another cluster. +This gives a full path for HBase to connect to another HBase cluster. An optional parameter for +table column families identifies which column families will be replicated to the peer cluster. Examples: hbase> add_peer '1', "server1.cie.com:2181:/hbase" hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod" - hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "tab1; tab2:cf1; tab3:cf2,cf3" + hbase> add_peer '3', "zk4,zk5,zk6:11000:/hbase-test", "table1; table2:cf1; table3:cf1,cf2" + hbase> add_peer '4', CLUSTER_KEY => "server1.cie.com:2181:/hbase" + hbase> add_peer '5', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + +For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments +are DATA and CONFIG which can be specified to set different either the peer_data or configuration +for the custom replication endpoint. Table column families is optional and can be specified with +the key TABLE_CFS. + + hbase> add_peer '6', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint' + hbase> add_peer '7', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + DATA => { "key1" => 1 } + hbase> add_peer '8', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + CONFIG => { "config1" => "value1", "config2" => "value2" } + hbase> add_peer '9', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, + hbase> add_peer '10', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + hbase> add_peer '11', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint', + DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, + TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + +Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified but not both. EOF end - def command(id, cluster_key, peer_tableCFs = nil) + def command(id, args = {}, peer_tableCFs = nil) format_simple_command do - replication_admin.add_peer(id, cluster_key, peer_tableCFs) + replication_admin.add_peer(id, args, peer_tableCFs) end end end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb new file mode 100644 index 00000000000..648efa7f861 --- /dev/null +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -0,0 +1,191 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'shell' +require 'shell/formatter' +require 'hbase' +require 'hbase/hbase' +require 'hbase/table' + +include HBaseConstants + +module Hbase + class ReplicationAdminTest < Test::Unit::TestCase + include TestHelpers + + def setup + @test_name = "hbase_shell_tests_table" + @peer_id = '1' + + setup_hbase + drop_test_table(@test_name) + create_test_table(@test_name) + + assert_equal(0, replication_admin.list_peers.length) + end + + def teardown + assert_equal(0, replication_admin.list_peers.length) + + shutdown + end + + define_test "add_peer: should fail when args isn't specified" do + assert_raise(ArgumentError) do + replication_admin.add_peer(@peer_id, nil) + end + end + + define_test "add_peer: fail when neither CLUSTER_KEY nor ENDPOINT_CLASSNAME are specified" do + assert_raise(ArgumentError) do + args = {} + replication_admin.add_peer(@peer_id, args) + end + end + + define_test "add_peer: fail when both CLUSTER_KEY and ENDPOINT_CLASSNAME are specified" do + assert_raise(ArgumentError) do + args = { CLUSTER_KEY => 'zk1,zk2,zk3:2182:/hbase-prod', + ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.MyReplicationEndpoint' } + replication_admin.add_peer(@peer_id, args) + end + end + + define_test "add_peer: args must be a string or number" do + assert_raise(ArgumentError) do + replication_admin.add_peer(@peer_id, 1) + end + assert_raise(ArgumentError) do + replication_admin.add_peer(@peer_id, ['test']) + end + end + + define_test "add_peer: single zk cluster key" do + cluster_key = "server1.cie.com:2181:/hbase" + + replication_admin.add_peer(@peer_id, cluster_key) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: multiple zk cluster key" do + cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" + + replication_admin.add_peer(@peer_id, cluster_key) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(replication_admin.list_peers.fetch(@peer_id), cluster_key) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: multiple zk cluster key and table_cfs" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + table_cfs_str = "table1;table2:cf1;table3:cf2,cf3" + + replication_admin.add_peer(@peer_id, cluster_key, table_cfs_str) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id)) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: single zk cluster key - peer config" do + cluster_key = "server1.cie.com:2181:/hbase" + + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: multiple zk cluster key - peer config" do + cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" + + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } + table_cfs_str = "table1;table2:cf1;table3:cf1,cf2" + + args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } + replication_admin.add_peer(@peer_id, args) + + assert_equal(1, replication_admin.list_peers.length) + assert(replication_admin.list_peers.key?(@peer_id)) + assert_equal(cluster_key, replication_admin.list_peers.fetch(@peer_id)) + assert_equal(table_cfs_str, replication_admin.show_peer_tableCFs(@peer_id)) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + + define_test "add_peer: should fail when args is a hash and peer_tableCFs provided" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + table_cfs_str = "table1;table2:cf1;table3:cf1,cf2" + + assert_raise(ArgumentError) do + args = { CLUSTER_KEY => cluster_key } + replication_admin.add_peer(@peer_id, args, table_cfs_str) + end + end + + # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 + # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. + # define_test "add_peer: adding a second peer with same id should error" do + # replication_admin.add_peer(@peer_id, '') + # assert_equal(1, replication_admin.list_peers.length) + # + # assert_raise(java.lang.IllegalArgumentException) do + # replication_admin.add_peer(@peer_id, '') + # end + # + # assert_equal(1, replication_admin.list_peers.length, 1) + # + # # cleanup for future tests + # replication_admin.remove_peer(@peer_id) + # end + end +end diff --git a/hbase-shell/src/test/ruby/test_helper.rb b/hbase-shell/src/test/ruby/test_helper.rb index 5dfafc5657a..e2ab9214f55 100644 --- a/hbase-shell/src/test/ruby/test_helper.rb +++ b/hbase-shell/src/test/ruby/test_helper.rb @@ -68,6 +68,10 @@ module Hbase @shell.hbase_visibility_labels_admin end + def replication_admin + @shell.hbase_replication_admin + end + def create_test_table(name) # Create the table if needed unless admin.exists?(name)