From 620d70d6186fb800299bcc62ad7179fccfd1be41 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 9 Jan 2019 15:38:39 +0800 Subject: [PATCH] HBASE-21694 Add append_peer_exclude_tableCFs and remove_peer_exclude_tableCFs shell commands Signed-off-by: Guanghao Zhang --- .../ReplicationPeerConfigUtil.java | 108 ++++++++++++++---- .../hbase/quotas/GlobalQuotaSettings.java | 5 +- .../src/main/ruby/hbase/replication_admin.rb | 32 ++++++ hbase-shell/src/main/ruby/shell.rb | 2 + .../commands/append_peer_exclude_tableCFs.rb | 42 +++++++ .../commands/remove_peer_exclude_tableCFs.rb | 42 +++++++ .../test/ruby/hbase/replication_admin_test.rb | 70 ++++++++++++ 7 files changed, 277 insertions(+), 24 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 331795c590c..25d30f49234 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -445,32 +445,53 @@ public final class ReplicationPeerConfigUtil { if (preTableCfs == null) { builder.setTableCFsMap(tableCfs); } else { - Map> newTableCfs = copyTableCFsMap(preTableCfs); - for (Map.Entry> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - Collection appendCfs = entry.getValue(); - if (newTableCfs.containsKey(table)) { - List cfs = newTableCfs.get(table); - if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - newTableCfs.put(table, null); - } else { - Set cfSet = new HashSet(cfs); - cfSet.addAll(appendCfs); - newTableCfs.put(table, Lists.newArrayList(cfSet)); - } - } else { - if (appendCfs == null || appendCfs.isEmpty()) { - newTableCfs.put(table, null); - } else { - newTableCfs.put(table, Lists.newArrayList(appendCfs)); - } - } - } - builder.setTableCFsMap(newTableCfs); + builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs)); } return builder.build(); } + public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig( + Map> excludeTableCfs, ReplicationPeerConfig peerConfig) + throws ReplicationException { + if (excludeTableCfs == null) { + throw new ReplicationException("exclude tableCfs is null"); + } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); + Map> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); + if (preExcludeTableCfs == null) { + builder.setExcludeTableCFsMap(excludeTableCfs); + } else { + builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs)); + } + return builder.build(); + } + + private static Map> mergeTableCFs( + Map> preTableCfs, Map> tableCfs) { + Map> newTableCfs = copyTableCFsMap(preTableCfs); + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection appendCfs = entry.getValue(); + if (newTableCfs.containsKey(table)) { + List cfs = newTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + newTableCfs.put(table, null); + } else { + Set cfSet = new HashSet(cfs); + cfSet.addAll(appendCfs); + newTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + newTableCfs.put(table, null); + } else { + newTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + return newTableCfs; + } + private static Map> copyTableCFsMap(Map> preTableCfs) { Map> newTableCfs = new HashMap<>(); @@ -519,6 +540,49 @@ public final class ReplicationPeerConfigUtil { return builder.build(); } + public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig( + Map> excludeTableCfs, ReplicationPeerConfig peerConfig, String id) + throws ReplicationException { + if (excludeTableCfs == null) { + throw new ReplicationException("exclude tableCfs is null"); + } + Map> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap(); + if (preExcludeTableCfs == null) { + throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null"); + } + Map> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs); + for (Map.Entry> entry : excludeTableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection removeCfs = entry.getValue(); + if (newExcludeTableCfs.containsKey(table)) { + List cfs = newExcludeTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + newExcludeTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set cfSet = new HashSet(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + newExcludeTableCfs.remove(table); + } else { + newExcludeTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from exclude-table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException( + "No table: " + table + " in exclude-table-cfs config of peer: " + id); + } + } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); + builder.setExcludeTableCFsMap(newExcludeTableCfs); + return builder.build(); + } + /** * Returns the configuration needed to talk to the remote slave cluster. * @param conf the base configuration diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettings.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettings.java index 107523ba5f8..23dc7d52ec3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettings.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettings.java @@ -20,11 +20,12 @@ import java.util.List; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest.Builder; -import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest.Builder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; + /** * An object which captures all quotas types (throttle or space) for a subject (user, table, or * namespace). This is used inside of the HBase RegionServer to act as an analogy to the diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index e06116822b0..c788835c6f5 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -210,6 +210,38 @@ module Hbase @admin.removeReplicationPeerTableCFs(id, map) end + # Append exclude-tableCFs to the exclude-tableCFs config for the specified peer + def append_peer_exclude_tableCFs(id, excludeTableCFs) + unless excludeTableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + excludeTableCFs.each do |key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + end + rpc = get_peer_config(id) + unless rpc.nil? + rpc = ReplicationPeerConfigUtil.appendExcludeTableCFsToReplicationPeerConfig(map, rpc) + @admin.updateReplicationPeerConfig(id, rpc) + end + end + end + + # Remove some exclude-tableCFs from the exclude-tableCFs config for the specified peer + def remove_peer_exclude_tableCFs(id, excludeTableCFs) + unless excludeTableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + excludeTableCFs.each do |key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + end + rpc = get_peer_config(id) + unless rpc.nil? + rpc = ReplicationPeerConfigUtil.removeExcludeTableCFsFromReplicationPeerConfig(map, rpc, id) + @admin.updateReplicationPeerConfig(id, rpc) + end + end + end + # Set new namespaces config for the specified peer def set_peer_namespaces(id, namespaces) unless namespaces.nil? diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 62a8baef8cf..6bd48046d81 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -389,6 +389,8 @@ Shell.load_command_group( show_peer_tableCFs set_peer_tableCFs set_peer_exclude_tableCFs + append_peer_exclude_tableCFs + remove_peer_exclude_tableCFs set_peer_bandwidth list_replicated_tables append_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb new file mode 100644 index 00000000000..c229c9425cd --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_exclude_tableCFs.rb @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class AppendPeerExcludeTableCFs < Command + def help + <<-EOF +Append table-cfs config to the specified peer' exclude table-cfs to make them non-replicable +Examples: + + # append tables / table-cfs to peers' exclude table-cfs + hbase> append_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]} + EOF + end + + def command(id, table_cfs) + replication_admin.append_peer_exclude_tableCFs(id, table_cfs) + end + + def command_name + 'append_peer_exclude_tableCFs' + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb new file mode 100644 index 00000000000..4f452072b3e --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_exclude_tableCFs.rb @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class RemovePeerExcludeTableCFs < Command + def help + <<-EOF +Remove table-cfs config from the specified peer' exclude table-cfs to make them replicable +Examples: + + # remove tables / table-cfs from peer' exclude table-cfs + hbase> remove_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]} + EOF + end + + def command(id, table_cfs) + replication_admin.remove_peer_exclude_tableCFs(id, table_cfs) + end + + def command_name + 'remove_peer_exclude_tableCFs' + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 85b4537a21a..7f5400f6994 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -361,6 +361,76 @@ module Hbase replication_admin.remove_peer(@peer_id) end + define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do + cluster_key = "zk4,zk5,zk6:11000:/hbase-test" + args = {CLUSTER_KEY => cluster_key} + command(:add_peer, @peer_id, args) + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + + # set exclude-table-cfs + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"]} + command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append empty exclude-table-cfs + append_table_cfs = {} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append exclude-table-cfs which don't exist in peer' exclude-table-cfs + append_table_cfs = {"table3" => ["cf3"]} + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"], "table3" => ["cf3"]} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # append exclude-table-cfs which exist in peer' exclude-table-cfs + append_table_cfs = {"table1" => ["cf1"], "ns2:table2" => ["cf1", "cf3"], "table3" => []} + exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2", "cf3"], "table3" => []} + command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + + define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do + cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' + args = {CLUSTER_KEY => cluster_key} + command(:add_peer, @peer_id, args) + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + + # set exclude-table-cfs + exclude_table_cfs = {'table1' => [], 'table2' => ['cf1'], 'ns3:table3' => ['cf1', 'cf2']} + command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove empty exclude-table-cfs + remove_table_cfs = {} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove exclude-table-cfs which exist in pees' exclude table cfs + remove_table_cfs = {'table1' => [], 'table2' => ['cf1']} + exclude_table_cfs = {'ns3:table3' => ['cf1', 'cf2']} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # remove exclude-table-cfs which exist in pees' exclude-table-cfs + remove_table_cfs = {'ns3:table3' => ['cf2', 'cf3']} + exclude_table_cfs = {'ns3:table3' => ['cf1']} + command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs) + assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap()) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + define_test "set_peer_namespaces: works with namespaces array" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" namespaces = ["ns1", "ns2"]