HBASE-21694 Add append_peer_exclude_tableCFs and remove_peer_exclude_tableCFs shell commands
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
111c827d1f
commit
117d12e9c6
|
@ -419,32 +419,53 @@ public final class ReplicationPeerConfigUtil {
|
|||
if (preTableCfs == null) {
|
||||
builder.setTableCFsMap(tableCfs);
|
||||
} else {
|
||||
Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
Collection<String> appendCfs = entry.getValue();
|
||||
if (newTableCfs.containsKey(table)) {
|
||||
List<String> cfs = newTableCfs.get(table);
|
||||
if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
|
||||
newTableCfs.put(table, null);
|
||||
} else {
|
||||
Set<String> cfSet = new HashSet<String>(cfs);
|
||||
cfSet.addAll(appendCfs);
|
||||
newTableCfs.put(table, Lists.newArrayList(cfSet));
|
||||
}
|
||||
} else {
|
||||
if (appendCfs == null || appendCfs.isEmpty()) {
|
||||
newTableCfs.put(table, null);
|
||||
} else {
|
||||
newTableCfs.put(table, Lists.newArrayList(appendCfs));
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.setTableCFsMap(newTableCfs);
|
||||
builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
|
||||
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
if (excludeTableCfs == null) {
|
||||
throw new ReplicationException("exclude tableCfs is null");
|
||||
}
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
|
||||
Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
|
||||
if (preExcludeTableCfs == null) {
|
||||
builder.setExcludeTableCFsMap(excludeTableCfs);
|
||||
} else {
|
||||
builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static Map<TableName, List<String>> mergeTableCFs(
|
||||
Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
|
||||
Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
Collection<String> appendCfs = entry.getValue();
|
||||
if (newTableCfs.containsKey(table)) {
|
||||
List<String> cfs = newTableCfs.get(table);
|
||||
if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
|
||||
newTableCfs.put(table, null);
|
||||
} else {
|
||||
Set<String> cfSet = new HashSet<String>(cfs);
|
||||
cfSet.addAll(appendCfs);
|
||||
newTableCfs.put(table, Lists.newArrayList(cfSet));
|
||||
}
|
||||
} else {
|
||||
if (appendCfs == null || appendCfs.isEmpty()) {
|
||||
newTableCfs.put(table, null);
|
||||
} else {
|
||||
newTableCfs.put(table, Lists.newArrayList(appendCfs));
|
||||
}
|
||||
}
|
||||
}
|
||||
return newTableCfs;
|
||||
}
|
||||
|
||||
private static Map<TableName, List<String>>
|
||||
copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
|
||||
Map<TableName, List<String>> newTableCfs = new HashMap<>();
|
||||
|
@ -493,6 +514,49 @@ public final class ReplicationPeerConfigUtil {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
|
||||
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
|
||||
throws ReplicationException {
|
||||
if (excludeTableCfs == null) {
|
||||
throw new ReplicationException("exclude tableCfs is null");
|
||||
}
|
||||
Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
|
||||
if (preExcludeTableCfs == null) {
|
||||
throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
|
||||
}
|
||||
Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
Collection<String> removeCfs = entry.getValue();
|
||||
if (newExcludeTableCfs.containsKey(table)) {
|
||||
List<String> cfs = newExcludeTableCfs.get(table);
|
||||
if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
|
||||
newExcludeTableCfs.remove(table);
|
||||
} else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
|
||||
Set<String> cfSet = new HashSet<String>(cfs);
|
||||
cfSet.removeAll(removeCfs);
|
||||
if (cfSet.isEmpty()) {
|
||||
newExcludeTableCfs.remove(table);
|
||||
} else {
|
||||
newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
|
||||
}
|
||||
} else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
|
||||
throw new ReplicationException("Cannot remove cf of table: " + table
|
||||
+ " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
|
||||
} else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
|
||||
throw new ReplicationException("Cannot remove table: " + table
|
||||
+ " which has specified cfs from exclude-table-cfs config in peer: " + id);
|
||||
}
|
||||
} else {
|
||||
throw new ReplicationException(
|
||||
"No table: " + table + " in exclude-table-cfs config of peer: " + id);
|
||||
}
|
||||
}
|
||||
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
|
||||
builder.setExcludeTableCFsMap(newExcludeTableCfs);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configuration needed to talk to the remote slave cluster.
|
||||
* @param conf the base configuration
|
||||
|
|
|
@ -20,11 +20,12 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest.Builder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest.Builder;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
|
||||
/**
|
||||
* An object which captures all quotas types (throttle or space) for a subject (user, table, or
|
||||
* namespace). This is used inside of the HBase RegionServer to act as an analogy to the
|
||||
|
|
|
@ -205,6 +205,38 @@ module Hbase
|
|||
@admin.removeReplicationPeerTableCFs(id, map)
|
||||
end
|
||||
|
||||
# Append exclude-tableCFs to the exclude-tableCFs config for the specified peer
|
||||
def append_peer_exclude_tableCFs(id, excludeTableCFs)
|
||||
unless excludeTableCFs.nil?
|
||||
# convert tableCFs to TableName
|
||||
map = java.util.HashMap.new
|
||||
excludeTableCFs.each do |key, val|
|
||||
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
|
||||
end
|
||||
rpc = get_peer_config(id)
|
||||
unless rpc.nil?
|
||||
rpc = ReplicationPeerConfigUtil.appendExcludeTableCFsToReplicationPeerConfig(map, rpc)
|
||||
@admin.updateReplicationPeerConfig(id, rpc)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Remove some exclude-tableCFs from the exclude-tableCFs config for the specified peer
|
||||
def remove_peer_exclude_tableCFs(id, excludeTableCFs)
|
||||
unless excludeTableCFs.nil?
|
||||
# convert tableCFs to TableName
|
||||
map = java.util.HashMap.new
|
||||
excludeTableCFs.each do |key, val|
|
||||
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
|
||||
end
|
||||
rpc = get_peer_config(id)
|
||||
unless rpc.nil?
|
||||
rpc = ReplicationPeerConfigUtil.removeExcludeTableCFsFromReplicationPeerConfig(map, rpc, id)
|
||||
@admin.updateReplicationPeerConfig(id, rpc)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Set new namespaces config for the specified peer
|
||||
def set_peer_namespaces(id, namespaces)
|
||||
unless namespaces.nil?
|
||||
|
|
|
@ -387,6 +387,8 @@ Shell.load_command_group(
|
|||
show_peer_tableCFs
|
||||
set_peer_tableCFs
|
||||
set_peer_exclude_tableCFs
|
||||
append_peer_exclude_tableCFs
|
||||
remove_peer_exclude_tableCFs
|
||||
set_peer_bandwidth
|
||||
list_replicated_tables
|
||||
append_peer_tableCFs
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class AppendPeerExcludeTableCFs < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Append table-cfs config to the specified peer' exclude table-cfs to make them non-replicable
|
||||
Examples:
|
||||
|
||||
# append tables / table-cfs to peers' exclude table-cfs
|
||||
hbase> append_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]}
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(id, table_cfs)
|
||||
replication_admin.append_peer_exclude_tableCFs(id, table_cfs)
|
||||
end
|
||||
|
||||
def command_name
|
||||
'append_peer_exclude_tableCFs'
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class RemovePeerExcludeTableCFs < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Remove table-cfs config from the specified peer' exclude table-cfs to make them replicable
|
||||
Examples:
|
||||
|
||||
# remove tables / table-cfs from peer' exclude table-cfs
|
||||
hbase> remove_peer_exclude_tableCFs '2', { "table1" => [], "ns2:table2" => ["cfA", "cfB"]}
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(id, table_cfs)
|
||||
replication_admin.remove_peer_exclude_tableCFs(id, table_cfs)
|
||||
end
|
||||
|
||||
def command_name
|
||||
'remove_peer_exclude_tableCFs'
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -338,6 +338,76 @@ module Hbase
|
|||
replication_admin.remove_peer(@peer_id)
|
||||
end
|
||||
|
||||
define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do
|
||||
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||
args = {CLUSTER_KEY => cluster_key}
|
||||
command(:add_peer, @peer_id, args)
|
||||
assert_equal(1, command(:list_peers).length)
|
||||
peer = command(:list_peers).get(0)
|
||||
assert_equal(@peer_id, peer.getPeerId)
|
||||
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
|
||||
|
||||
# set exclude-table-cfs
|
||||
exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"]}
|
||||
command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# append empty exclude-table-cfs
|
||||
append_table_cfs = {}
|
||||
command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# append exclude-table-cfs which don't exist in peer' exclude-table-cfs
|
||||
append_table_cfs = {"table3" => ["cf3"]}
|
||||
exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2"], "table3" => ["cf3"]}
|
||||
command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# append exclude-table-cfs which exist in peer' exclude-table-cfs
|
||||
append_table_cfs = {"table1" => ["cf1"], "ns2:table2" => ["cf1", "cf3"], "table3" => []}
|
||||
exclude_table_cfs = {"table1" => [], "ns2:table2" => ["cf1", "cf2", "cf3"], "table3" => []}
|
||||
command(:append_peer_exclude_tableCFs, @peer_id, append_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# cleanup for future tests
|
||||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do
|
||||
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
||||
args = {CLUSTER_KEY => cluster_key}
|
||||
command(:add_peer, @peer_id, args)
|
||||
assert_equal(1, command(:list_peers).length)
|
||||
peer = command(:list_peers).get(0)
|
||||
assert_equal(@peer_id, peer.getPeerId)
|
||||
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
|
||||
|
||||
# set exclude-table-cfs
|
||||
exclude_table_cfs = {'table1' => [], 'table2' => ['cf1'], 'ns3:table3' => ['cf1', 'cf2']}
|
||||
command(:set_peer_exclude_tableCFs, @peer_id, exclude_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# remove empty exclude-table-cfs
|
||||
remove_table_cfs = {}
|
||||
command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# remove exclude-table-cfs which exist in pees' exclude table cfs
|
||||
remove_table_cfs = {'table1' => [], 'table2' => ['cf1']}
|
||||
exclude_table_cfs = {'ns3:table3' => ['cf1', 'cf2']}
|
||||
command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# remove exclude-table-cfs which exist in pees' exclude-table-cfs
|
||||
remove_table_cfs = {'ns3:table3' => ['cf2', 'cf3']}
|
||||
exclude_table_cfs = {'ns3:table3' => ['cf1']}
|
||||
command(:remove_peer_exclude_tableCFs, @peer_id, remove_table_cfs)
|
||||
assert_tablecfs_equal(exclude_table_cfs, command(:get_peer_config, @peer_id).getExcludeTableCFsMap())
|
||||
|
||||
# cleanup for future tests
|
||||
replication_admin.remove_peer(@peer_id)
|
||||
end
|
||||
|
||||
define_test "set_peer_namespaces: works with namespaces array" do
|
||||
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||
namespaces = ["ns1", "ns2"]
|
||||
|
|
Loading…
Reference in New Issue