HBASE-19492 Add EXCLUDE_NAMESPACE and EXCLUDE_TABLECFS support to replication peer config
This commit is contained in:
parent
1fd22a7b0f
commit
60cd494d1c
|
@ -48,6 +48,7 @@ import java.util.Map;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for TableCFs Operations.
|
* Helper for TableCFs Operations.
|
||||||
|
@ -289,11 +290,8 @@ public final class ReplicationPeerConfigUtil {
|
||||||
|
|
||||||
List<ByteString> namespacesList = peer.getNamespacesList();
|
List<ByteString> namespacesList = peer.getNamespacesList();
|
||||||
if (namespacesList != null && namespacesList.size() != 0) {
|
if (namespacesList != null && namespacesList.size() != 0) {
|
||||||
Set<String> namespaces = new HashSet<>();
|
peerConfig.setNamespaces(
|
||||||
for (ByteString namespace : namespacesList) {
|
namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
|
||||||
namespaces.add(namespace.toStringUtf8());
|
|
||||||
}
|
|
||||||
peerConfig.setNamespaces(namespaces);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (peer.hasBandwidth()) {
|
if (peer.hasBandwidth()) {
|
||||||
|
@ -304,6 +302,19 @@ public final class ReplicationPeerConfigUtil {
|
||||||
peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
|
peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<TableName, ? extends Collection<String>> excludeTableCFsMap =
|
||||||
|
convert2Map(peer.getExcludeTableCfsList()
|
||||||
|
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
|
||||||
|
if (excludeTableCFsMap != null) {
|
||||||
|
peerConfig.setExcludeTableCFsMap(excludeTableCFsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
|
||||||
|
if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
|
||||||
|
peerConfig.setExcludeNamespaces(
|
||||||
|
excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
|
||||||
|
}
|
||||||
|
|
||||||
return peerConfig;
|
return peerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,6 +357,20 @@ public final class ReplicationPeerConfigUtil {
|
||||||
|
|
||||||
builder.setBandwidth(peerConfig.getBandwidth());
|
builder.setBandwidth(peerConfig.getBandwidth());
|
||||||
builder.setReplicateAll(peerConfig.replicateAllUserTables());
|
builder.setReplicateAll(peerConfig.replicateAllUserTables());
|
||||||
|
|
||||||
|
ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
|
||||||
|
if (excludeTableCFs != null) {
|
||||||
|
for (int i = 0; i < excludeTableCFs.length; i++) {
|
||||||
|
builder.addExcludeTableCfs(excludeTableCFs[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
|
||||||
|
if (excludeNamespaces != null) {
|
||||||
|
for (String namespace : excludeNamespaces) {
|
||||||
|
builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,8 @@ public class ReplicationPeerConfig {
|
||||||
private long bandwidth = 0;
|
private long bandwidth = 0;
|
||||||
// Default value is true, means replicate all user tables to peer cluster.
|
// Default value is true, means replicate all user tables to peer cluster.
|
||||||
private boolean replicateAllUserTables = true;
|
private boolean replicateAllUserTables = true;
|
||||||
|
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
|
||||||
|
private Set<String> excludeNamespaces = null;
|
||||||
|
|
||||||
public ReplicationPeerConfig() {
|
public ReplicationPeerConfig() {
|
||||||
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -121,17 +123,45 @@ public class ReplicationPeerConfig {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<TableName, List<String>> getExcludeTableCFsMap() {
|
||||||
|
return (Map<TableName, List<String>>) excludeTableCFsMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationPeerConfig setExcludeTableCFsMap(Map<TableName,
|
||||||
|
? extends Collection<String>> tableCFsMap) {
|
||||||
|
this.excludeTableCFsMap = tableCFsMap;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getExcludeNamespaces() {
|
||||||
|
return this.excludeNamespaces;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
|
||||||
|
this.excludeNamespaces = namespaces;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
|
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
|
||||||
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
|
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
|
||||||
builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
|
builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
|
||||||
|
if (replicateAllUserTables) {
|
||||||
|
if (excludeNamespaces != null) {
|
||||||
|
builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(",");
|
||||||
|
}
|
||||||
|
if (excludeTableCFsMap != null) {
|
||||||
|
builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(",");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
if (namespaces != null) {
|
if (namespaces != null) {
|
||||||
builder.append("namespaces=").append(namespaces.toString()).append(",");
|
builder.append("namespaces=").append(namespaces.toString()).append(",");
|
||||||
}
|
}
|
||||||
if (tableCFsMap != null) {
|
if (tableCFsMap != null) {
|
||||||
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
|
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
builder.append("bandwidth=").append(bandwidth);
|
builder.append("bandwidth=").append(bandwidth);
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
@ -142,11 +172,15 @@ public class ReplicationPeerConfig {
|
||||||
* @return true if the table need replicate to the peer cluster
|
* @return true if the table need replicate to the peer cluster
|
||||||
*/
|
*/
|
||||||
public boolean needToReplicate(TableName table) {
|
public boolean needToReplicate(TableName table) {
|
||||||
// If null means user has explicitly not configured any namespaces and table CFs
|
if (replicateAllUserTables) {
|
||||||
// so all the tables data are applicable for replication
|
if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
|
||||||
if (namespaces == null && tableCFsMap == null) {
|
return false;
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
|
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -156,3 +190,4 @@ public class ReplicationPeerConfig {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -46,6 +46,8 @@ message ReplicationPeer {
|
||||||
repeated bytes namespaces = 6;
|
repeated bytes namespaces = 6;
|
||||||
optional int64 bandwidth = 7;
|
optional int64 bandwidth = 7;
|
||||||
optional bool replicate_all = 8;
|
optional bool replicate_all = 8;
|
||||||
|
repeated TableCF exclude_table_cfs = 9;
|
||||||
|
repeated bytes exclude_namespaces = 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -369,6 +369,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
existingConfig.setNamespaces(newConfig.getNamespaces());
|
existingConfig.setNamespaces(newConfig.getNamespaces());
|
||||||
existingConfig.setBandwidth(newConfig.getBandwidth());
|
existingConfig.setBandwidth(newConfig.getBandwidth());
|
||||||
existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
|
existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
|
||||||
|
existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces());
|
||||||
|
existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||||
|
|
|
@ -118,15 +118,32 @@ public class ReplicationManager {
|
||||||
return peers;
|
return peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
|
/**
|
||||||
IOException {
|
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||||
|
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
|
||||||
|
* peer cluster.
|
||||||
|
*
|
||||||
|
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||||
|
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
|
||||||
|
*/
|
||||||
|
private void checkPeerConfig(ReplicationPeerConfig peerConfig)
|
||||||
|
throws ReplicationException, IOException {
|
||||||
if (peerConfig.replicateAllUserTables()) {
|
if (peerConfig.replicateAllUserTables()) {
|
||||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
||||||
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
||||||
throw new ReplicationException(
|
throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
|
||||||
"Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
|
+ " when replicate_all flag is true");
|
||||||
}
|
}
|
||||||
|
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
|
||||||
|
peerConfig.getExcludeTableCFsMap());
|
||||||
} else {
|
} else {
|
||||||
|
if ((peerConfig.getExcludeNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
||||||
|
|| (peerConfig.getExcludeTableCFsMap() != null
|
||||||
|
&& !peerConfig.getTableCFsMap().isEmpty())) {
|
||||||
|
throw new ReplicationException(
|
||||||
|
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
|
||||||
|
+ " when replicate_all flag is false");
|
||||||
|
}
|
||||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||||
peerConfig.getTableCFsMap());
|
peerConfig.getTableCFsMap());
|
||||||
}
|
}
|
||||||
|
@ -134,17 +151,19 @@ public class ReplicationManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set a namespace in the peer config means that all tables in this namespace
|
* Set a namespace in the peer config means that all tables in this namespace will be replicated
|
||||||
* will be replicated to the peer cluster.
|
* to the peer cluster.
|
||||||
|
* 1. If peer config already has a namespace, then not allow set any table of this namespace
|
||||||
|
* to the peer config.
|
||||||
|
* 2. If peer config already has a table, then not allow set this table's namespace to the peer
|
||||||
|
* config.
|
||||||
*
|
*
|
||||||
* 1. If you already have set a namespace in the peer config, then you can't set any table
|
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
|
||||||
* of this namespace to the peer config.
|
* replicated to the peer cluster.
|
||||||
* 2. If you already have set a table in the peer config, then you can't set this table's
|
* 1. If peer config already has a exclude namespace, then not allow set any exclude table of
|
||||||
* namespace to the peer config.
|
* this namespace to the peer config.
|
||||||
*
|
* 2. If peer config already has a exclude table, then not allow set this table's namespace
|
||||||
* @param namespaces
|
* as a exclude namespace.
|
||||||
* @param tableCfs
|
|
||||||
* @throws ReplicationException
|
|
||||||
*/
|
*/
|
||||||
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
||||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
|
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
|
||||||
|
@ -157,8 +176,8 @@ public class ReplicationManager {
|
||||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||||
TableName table = entry.getKey();
|
TableName table = entry.getKey();
|
||||||
if (namespaces.contains(table.getNamespaceAsString())) {
|
if (namespaces.contains(table.getNamespaceAsString())) {
|
||||||
throw new ReplicationException(
|
throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
|
||||||
"Table-cfs config conflict with namespaces config in peer");
|
+ table.getNamespaceAsString() + " in peer config");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,16 +32,17 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry
|
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
|
||||||
* by namespaces config, then filter entry by table-cfs config.
|
* exclude namespaces config, and exclude table-cfs config.
|
||||||
*
|
*
|
||||||
* 1. Set a namespace in peer config means that all tables in this namespace will be replicated.
|
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
|
||||||
* 2. If the namespaces config is null, then the table-cfs config decide which table's edit
|
* you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
|
||||||
* can be replicated. If the table-cfs config is null, then the namespaces config decide
|
* Note: set a exclude namespace means that all tables in this namespace can't be replicated.
|
||||||
* which table's edit can be replicated.
|
*
|
||||||
|
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
|
||||||
|
* But you can set namespaces or table-cfs which will be replicated to peer cluster.
|
||||||
|
* Note: set a namespace means that all tables in this namespace will be replicated.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
|
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
|
||||||
|
@ -61,7 +62,15 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
|
||||||
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
|
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
|
||||||
|
|
||||||
if (peerConfig.replicateAllUserTables()) {
|
if (peerConfig.replicateAllUserTables()) {
|
||||||
// replicate all user tables, so return entry directly
|
// replicate all user tables, but filter by exclude namespaces config
|
||||||
|
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
|
||||||
|
|
||||||
|
// return null(prevent replicating) if logKey's table is in this peer's
|
||||||
|
// exclude namespaces list
|
||||||
|
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
} else {
|
} else {
|
||||||
// Not replicate all user tables, so filter by namespaces and table-cfs config
|
// Not replicate all user tables, so filter by namespaces and table-cfs config
|
||||||
|
@ -80,7 +89,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
|
||||||
|
|
||||||
// Then filter by table-cfs config
|
// Then filter by table-cfs config
|
||||||
// return null(prevent replicating) if logKey's table isn't in this peer's
|
// return null(prevent replicating) if logKey's table isn't in this peer's
|
||||||
// replicaable namespace list and table list
|
// replicable tables list
|
||||||
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
|
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -93,34 +102,39 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
|
||||||
public Cell filterCell(final Entry entry, Cell cell) {
|
public Cell filterCell(final Entry entry, Cell cell) {
|
||||||
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
|
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
|
||||||
if (peerConfig.replicateAllUserTables()) {
|
if (peerConfig.replicateAllUserTables()) {
|
||||||
// replicate all user tables, so return cell directly
|
// replicate all user tables, but filter by exclude table-cfs config
|
||||||
|
final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
|
||||||
|
if (excludeTableCfs == null) {
|
||||||
|
return cell;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||||
|
cell = bulkLoadFilter.filterCell(cell,
|
||||||
|
fam -> filterByExcludeTableCfs(entry.getKey().getTablename(), Bytes.toString(fam),
|
||||||
|
excludeTableCfs));
|
||||||
|
} else {
|
||||||
|
if (filterByExcludeTableCfs(entry.getKey().getTablename(),
|
||||||
|
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
|
||||||
|
excludeTableCfs)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return cell;
|
return cell;
|
||||||
} else {
|
} else {
|
||||||
|
// not replicate all user tables, so filter by table-cfs config
|
||||||
final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
|
final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
|
||||||
if (tableCfs == null) {
|
if (tableCfs == null) {
|
||||||
return cell;
|
return cell;
|
||||||
}
|
}
|
||||||
TableName tabName = entry.getKey().getTablename();
|
|
||||||
List<String> cfs = tableCfs.get(tabName);
|
|
||||||
// ignore(remove) kv if its cf isn't in the replicable cf list
|
|
||||||
// (empty cfs means all cfs of this table are replicable)
|
|
||||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||||
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
|
cell = bulkLoadFilter.filterCell(cell,
|
||||||
@Override
|
fam -> filterByTableCfs(entry.getKey().getTablename(), Bytes.toString(fam), tableCfs));
|
||||||
public boolean apply(byte[] fam) {
|
|
||||||
if (tableCfs != null) {
|
|
||||||
List<String> cfs = tableCfs.get(entry.getKey().getTablename());
|
|
||||||
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
if ((cfs != null)
|
if (filterByTableCfs(entry.getKey().getTablename(),
|
||||||
&& !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
|
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
|
||||||
cell.getFamilyLength()))) {
|
tableCfs)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -128,4 +142,31 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
|
||||||
return cell;
|
return cell;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean filterByExcludeTableCfs(TableName tableName, String family,
|
||||||
|
Map<TableName, List<String>> excludeTableCfs) {
|
||||||
|
List<String> excludeCfs = excludeTableCfs.get(tableName);
|
||||||
|
if (excludeCfs != null) {
|
||||||
|
// empty cfs means all cfs of this table are excluded
|
||||||
|
if (excludeCfs.isEmpty()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// ignore(remove) kv if its cf is in the exclude cfs list
|
||||||
|
if (excludeCfs.contains(family)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean filterByTableCfs(TableName tableName, String family,
|
||||||
|
Map<TableName, List<String>> tableCfs) {
|
||||||
|
List<String> cfs = tableCfs.get(tableName);
|
||||||
|
// ignore(remove) kv if its cf isn't in the replicable cf list
|
||||||
|
// (empty cfs means all cfs of this table are replicable)
|
||||||
|
if (cfs != null && !cfs.contains(family)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -482,9 +482,98 @@ public class TestReplicationAdmin {
|
||||||
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeerExcludeNamespaces() throws Exception {
|
||||||
|
String ns1 = "ns1";
|
||||||
|
String ns2 = "ns2";
|
||||||
|
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
|
rpc.setClusterKey(KEY_ONE);
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
|
||||||
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
|
assertTrue(rpc.replicateAllUserTables());
|
||||||
|
|
||||||
|
Set<String> namespaces = new HashSet<String>();
|
||||||
|
namespaces.add(ns1);
|
||||||
|
namespaces.add(ns2);
|
||||||
|
rpc.setExcludeNamespaces(namespaces);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
|
namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
|
||||||
|
assertEquals(2, namespaces.size());
|
||||||
|
assertTrue(namespaces.contains(ns1));
|
||||||
|
assertTrue(namespaces.contains(ns2));
|
||||||
|
|
||||||
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
|
namespaces.clear();
|
||||||
|
namespaces.add(ns1);
|
||||||
|
rpc.setExcludeNamespaces(namespaces);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
|
namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
|
||||||
|
assertEquals(1, namespaces.size());
|
||||||
|
assertTrue(namespaces.contains(ns1));
|
||||||
|
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPeerExcludeTableCFs() throws Exception {
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
|
rpc.setClusterKey(KEY_ONE);
|
||||||
|
TableName tab1 = TableName.valueOf("t1");
|
||||||
|
TableName tab2 = TableName.valueOf("t2");
|
||||||
|
TableName tab3 = TableName.valueOf("t3");
|
||||||
|
TableName tab4 = TableName.valueOf("t4");
|
||||||
|
|
||||||
|
// Add a valid peer
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
|
assertTrue(rpc.replicateAllUserTables());
|
||||||
|
|
||||||
|
Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
|
||||||
|
tableCFs.put(tab1, null);
|
||||||
|
rpc.setExcludeTableCFsMap(tableCFs);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
|
Map<TableName, List<String>> result =
|
||||||
|
hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
|
||||||
|
assertEquals(1, result.size());
|
||||||
|
assertEquals(true, result.containsKey(tab1));
|
||||||
|
assertNull(result.get(tab1));
|
||||||
|
|
||||||
|
tableCFs.put(tab2, new ArrayList<String>());
|
||||||
|
tableCFs.get(tab2).add("f1");
|
||||||
|
rpc.setExcludeTableCFsMap(tableCFs);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
|
result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
|
||||||
|
assertEquals(2, result.size());
|
||||||
|
assertTrue("Should contain t1", result.containsKey(tab1));
|
||||||
|
assertTrue("Should contain t2", result.containsKey(tab2));
|
||||||
|
assertNull(result.get(tab1));
|
||||||
|
assertEquals(1, result.get(tab2).size());
|
||||||
|
assertEquals("f1", result.get(tab2).get(0));
|
||||||
|
|
||||||
|
tableCFs.clear();
|
||||||
|
tableCFs.put(tab3, new ArrayList<String>());
|
||||||
|
tableCFs.put(tab4, new ArrayList<String>());
|
||||||
|
tableCFs.get(tab4).add("f1");
|
||||||
|
tableCFs.get(tab4).add("f2");
|
||||||
|
rpc.setExcludeTableCFsMap(tableCFs);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
|
result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
|
||||||
|
assertEquals(2, result.size());
|
||||||
|
assertTrue("Should contain t3", result.containsKey(tab3));
|
||||||
|
assertTrue("Should contain t4", result.containsKey(tab4));
|
||||||
|
assertNull(result.get(tab3));
|
||||||
|
assertEquals(2, result.get(tab4).size());
|
||||||
|
assertEquals("f1", result.get(tab4).get(0));
|
||||||
|
assertEquals("f2", result.get(tab4).get(1));
|
||||||
|
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPeerConfigConflict() throws Exception {
|
public void testPeerConfigConflict() throws Exception {
|
||||||
// Default replicate all flag is true
|
// Default replicate_all flag is true
|
||||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
rpc.setClusterKey(KEY_ONE);
|
rpc.setClusterKey(KEY_ONE);
|
||||||
|
|
||||||
|
@ -492,39 +581,68 @@ public class TestReplicationAdmin {
|
||||||
Set<String> namespaces = new HashSet<String>();
|
Set<String> namespaces = new HashSet<String>();
|
||||||
namespaces.add(ns1);
|
namespaces.add(ns1);
|
||||||
|
|
||||||
TableName tab1 = TableName.valueOf("ns1:tabl");
|
TableName tab1 = TableName.valueOf("ns2:tabl");
|
||||||
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
||||||
tableCfs.put(tab1, new ArrayList<String>());
|
tableCfs.put(tab1, new ArrayList<String>());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rpc.setNamespaces(namespaces);
|
rpc.setNamespaces(namespaces);
|
||||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
|
fail("Should throw Exception."
|
||||||
|
+ " When replicate all flag is true, no need to config namespaces");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// OK
|
// OK
|
||||||
rpc.setNamespaces(null);
|
rpc.setNamespaces(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rpc.setTableCFsMap(tableCfs);
|
|
||||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
|
||||||
fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
|
|
||||||
} catch (IOException e) {
|
|
||||||
// OK
|
|
||||||
rpc.setTableCFsMap(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
rpc.setNamespaces(namespaces);
|
|
||||||
rpc.setTableCFsMap(tableCfs);
|
rpc.setTableCFsMap(tableCfs);
|
||||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
fail("Should throw Exception."
|
fail("Should throw Exception."
|
||||||
+ " When replicate all flag is true, no need to config namespaces or table-cfs");
|
+ " When replicate all flag is true, no need to config table-cfs");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// OK
|
// OK
|
||||||
rpc.setNamespaces(null);
|
|
||||||
rpc.setTableCFsMap(null);
|
rpc.setTableCFsMap(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set replicate_all flag to true
|
||||||
|
rpc.setReplicateAllUserTables(false);
|
||||||
|
try {
|
||||||
|
rpc.setExcludeNamespaces(namespaces);
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
fail("Should throw Exception."
|
||||||
|
+ " When replicate all flag is false, no need to config exclude namespaces");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// OK
|
||||||
|
rpc.setExcludeNamespaces(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
rpc.setExcludeTableCFsMap(tableCfs);
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
fail("Should throw Exception."
|
||||||
|
+ " When replicate all flag is false, no need to config exclude table-cfs");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// OK
|
||||||
|
rpc.setExcludeTableCFsMap(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc.setNamespaces(namespaces);
|
||||||
|
rpc.setTableCFsMap(tableCfs);
|
||||||
|
// OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
|
||||||
|
// Default replicate_all flag is true
|
||||||
|
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||||
|
rpc2.setClusterKey(KEY_SECOND);
|
||||||
|
rpc2.setExcludeNamespaces(namespaces);
|
||||||
|
rpc2.setExcludeTableCFsMap(tableCfs);
|
||||||
|
// OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
|
||||||
|
// table-cfs config
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
|
||||||
|
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -539,41 +657,80 @@ public class TestReplicationAdmin {
|
||||||
rpc.setReplicateAllUserTables(false);
|
rpc.setReplicateAllUserTables(false);
|
||||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
|
||||||
|
|
||||||
rpc = admin.getPeerConfig(ID_ONE);
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
Set<String> namespaces = new HashSet<String>();
|
Set<String> namespaces = new HashSet<String>();
|
||||||
namespaces.add(ns1);
|
namespaces.add(ns1);
|
||||||
rpc.setNamespaces(namespaces);
|
rpc.setNamespaces(namespaces);
|
||||||
admin.updatePeerConfig(ID_ONE, rpc);
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
rpc = admin.getPeerConfig(ID_ONE);
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
|
try {
|
||||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||||
tableCfs.put(tableName1, new ArrayList<>());
|
tableCfs.put(tableName1, new ArrayList<>());
|
||||||
rpc.setTableCFsMap(tableCfs);
|
rpc.setTableCFsMap(tableCfs);
|
||||||
try {
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
admin.updatePeerConfig(ID_ONE, rpc);
|
fail("Should throw ReplicationException" + " Because table " + tableName1
|
||||||
fail("Should throw ReplicationException, because table " + tableName1 + " conflict with namespace "
|
+ " conflict with namespace " + ns1);
|
||||||
+ ns1);
|
} catch (Exception e) {
|
||||||
} catch (IOException e) {
|
|
||||||
// OK
|
// OK
|
||||||
}
|
}
|
||||||
|
|
||||||
rpc = admin.getPeerConfig(ID_ONE);
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
tableCfs.clear();
|
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||||
tableCfs.put(tableName2, new ArrayList<>());
|
tableCfs.put(tableName2, new ArrayList<>());
|
||||||
rpc.setTableCFsMap(tableCfs);
|
rpc.setTableCFsMap(tableCfs);
|
||||||
admin.updatePeerConfig(ID_ONE, rpc);
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
rpc = admin.getPeerConfig(ID_ONE);
|
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
|
||||||
|
try {
|
||||||
namespaces.clear();
|
namespaces.clear();
|
||||||
namespaces.add(ns2);
|
namespaces.add(ns2);
|
||||||
rpc.setNamespaces(namespaces);
|
rpc.setNamespaces(namespaces);
|
||||||
try {
|
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
|
||||||
admin.updatePeerConfig(ID_ONE, rpc);
|
fail("Should throw ReplicationException" + " Because namespace " + ns2
|
||||||
fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table "
|
+ " conflict with table " + tableName2);
|
||||||
+ tableName2);
|
} catch (Exception e) {
|
||||||
} catch (IOException e) {
|
|
||||||
// OK
|
// OK
|
||||||
}
|
}
|
||||||
|
|
||||||
admin.removePeer(ID_ONE);
|
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||||
|
rpc2.setClusterKey(KEY_SECOND);
|
||||||
|
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
|
||||||
|
|
||||||
|
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
|
||||||
|
Set<String> excludeNamespaces = new HashSet<String>();
|
||||||
|
excludeNamespaces.add(ns1);
|
||||||
|
rpc2.setExcludeNamespaces(excludeNamespaces);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
|
||||||
|
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
|
||||||
|
try {
|
||||||
|
Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
|
||||||
|
excludeTableCfs.put(tableName1, new ArrayList<>());
|
||||||
|
rpc2.setExcludeTableCFsMap(excludeTableCfs);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
|
||||||
|
fail("Should throw ReplicationException" + " Because exclude table " + tableName1
|
||||||
|
+ " conflict with exclude namespace " + ns1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// OK
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
|
||||||
|
Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
|
||||||
|
excludeTableCfs.put(tableName2, new ArrayList<>());
|
||||||
|
rpc2.setExcludeTableCFsMap(excludeTableCfs);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
|
||||||
|
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
|
||||||
|
try {
|
||||||
|
namespaces.clear();
|
||||||
|
namespaces.add(ns2);
|
||||||
|
rpc2.setNamespaces(namespaces);
|
||||||
|
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
|
||||||
|
fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
|
||||||
|
+ " conflict with exclude table " + tableName2);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// OK
|
||||||
|
}
|
||||||
|
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||||
|
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -22,8 +22,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -47,6 +46,8 @@ import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||||
|
|
||||||
@Category({ReplicationTests.class, SmallTests.class})
|
@Category({ReplicationTests.class, SmallTests.class})
|
||||||
public class TestReplicationWALEntryFilters {
|
public class TestReplicationWALEntryFilters {
|
||||||
|
|
||||||
|
@ -205,23 +206,17 @@ public class TestReplicationWALEntryFilters {
|
||||||
ReplicationPeer peer = mock(ReplicationPeer.class);
|
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||||
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||||
|
|
||||||
// 1. replicate all user tables
|
// 1. replicate_all flag is false, no namespaces and table-cfs config
|
||||||
when(peerConfig.replicateAllUserTables()).thenReturn(true);
|
|
||||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
|
||||||
Entry userEntry = createEntry(null, a, b, c);
|
|
||||||
ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
|
||||||
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
|
||||||
|
|
||||||
// 2. not replicate all user tables, no namespaces and table-cfs config
|
|
||||||
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
when(peerConfig.replicateAllUserTables()).thenReturn(false);
|
||||||
when(peerConfig.getNamespaces()).thenReturn(null);
|
when(peerConfig.getNamespaces()).thenReturn(null);
|
||||||
when(peerConfig.getTableCFsMap()).thenReturn(null);
|
when(peerConfig.getTableCFsMap()).thenReturn(null);
|
||||||
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
userEntry = createEntry(null, a, b, c);
|
Entry userEntry = createEntry(null, a, b, c);
|
||||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
ChainWALEntryFilter filter =
|
||||||
|
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
assertEquals(null, filter.filter(userEntry));
|
assertEquals(null, filter.filter(userEntry));
|
||||||
|
|
||||||
// 3. Only config table-cfs in peer
|
// 2. replicate_all flag is false, and only config table-cfs in peer
|
||||||
// empty map
|
// empty map
|
||||||
userEntry = createEntry(null, a, b, c);
|
userEntry = createEntry(null, a, b, c);
|
||||||
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
Map<TableName, List<String>> tableCfs = new HashMap<>();
|
||||||
|
@ -261,7 +256,7 @@ public class TestReplicationWALEntryFilters {
|
||||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
|
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
|
||||||
|
|
||||||
// 3. Only config namespaces in peer
|
// 3. replicate_all flag is false, and only config namespaces in peer
|
||||||
when(peer.getTableCFs()).thenReturn(null);
|
when(peer.getTableCFs()).thenReturn(null);
|
||||||
// empty set
|
// empty set
|
||||||
Set<String> namespaces = new HashSet<>();
|
Set<String> namespaces = new HashSet<>();
|
||||||
|
@ -292,7 +287,7 @@ public class TestReplicationWALEntryFilters {
|
||||||
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
assertEquals(null, filter.filter(userEntry));
|
assertEquals(null, filter.filter(userEntry));
|
||||||
|
|
||||||
// 4. Config namespaces and table-cfs both
|
// 4. replicate_all flag is false, and config namespaces and table-cfs both
|
||||||
// Namespaces config should not confict with table-cfs config
|
// Namespaces config should not confict with table-cfs config
|
||||||
namespaces = new HashSet<>();
|
namespaces = new HashSet<>();
|
||||||
tableCfs = new HashMap<>();
|
tableCfs = new HashMap<>();
|
||||||
|
@ -331,9 +326,110 @@ public class TestReplicationWALEntryFilters {
|
||||||
assertEquals(null, filter.filter(userEntry));
|
assertEquals(null, filter.filter(userEntry));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNamespaceTableCfWALEntryFilter2() {
|
||||||
|
ReplicationPeer peer = mock(ReplicationPeer.class);
|
||||||
|
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
|
||||||
|
|
||||||
|
// 1. replicate_all flag is true
|
||||||
|
// and no exclude namespaces and no exclude table-cfs config
|
||||||
|
when(peerConfig.replicateAllUserTables()).thenReturn(true);
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
Entry userEntry = createEntry(null, a, b, c);
|
||||||
|
ChainWALEntryFilter filter =
|
||||||
|
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// 2. replicate_all flag is true, and only config exclude namespaces
|
||||||
|
// empty set
|
||||||
|
Set<String> namespaces = new HashSet<String>();
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// exclude namespace default
|
||||||
|
namespaces.add("default");
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(null, filter.filter(userEntry));
|
||||||
|
|
||||||
|
// exclude namespace ns1
|
||||||
|
namespaces = new HashSet<String>();
|
||||||
|
namespaces.add("ns1");
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// 3. replicate_all flag is true, and only config exclude table-cfs
|
||||||
|
// empty table-cfs map
|
||||||
|
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// exclude table bar
|
||||||
|
tableCfs = new HashMap<TableName, List<String>>();
|
||||||
|
tableCfs.put(TableName.valueOf("bar"), null);
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// exclude table foo:a
|
||||||
|
tableCfs = new HashMap<TableName, List<String>>();
|
||||||
|
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
|
||||||
|
// exclude ns1 and table foo:a,c
|
||||||
|
namespaces = new HashSet<String>();
|
||||||
|
tableCfs = new HashMap<TableName, List<String>>();
|
||||||
|
namespaces.add("ns1");
|
||||||
|
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(createEntry(null, b), filter.filter(userEntry));
|
||||||
|
|
||||||
|
// exclude namespace default and table ns1:bar
|
||||||
|
namespaces = new HashSet<String>();
|
||||||
|
tableCfs = new HashMap<TableName, List<String>>();
|
||||||
|
namespaces.add("default");
|
||||||
|
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
|
||||||
|
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
|
||||||
|
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
|
||||||
|
when(peer.getPeerConfig()).thenReturn(peerConfig);
|
||||||
|
userEntry = createEntry(null, a, b, c);
|
||||||
|
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
|
||||||
|
assertEquals(null, filter.filter(userEntry));
|
||||||
|
}
|
||||||
|
|
||||||
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
|
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
|
||||||
WALKeyImpl key1 =
|
WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
|
||||||
new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes);
|
System.currentTimeMillis(), scopes);
|
||||||
WALEdit edit1 = new WALEdit();
|
WALEdit edit1 = new WALEdit();
|
||||||
|
|
||||||
for (byte[] kv : kvs) {
|
for (byte[] kv : kvs) {
|
||||||
|
@ -342,7 +438,6 @@ public class TestReplicationWALEntryFilters {
|
||||||
return new Entry(key1, edit1);
|
return new Entry(key1, edit1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void assertEquals(Entry e1, Entry e2) {
|
private void assertEquals(Entry e1, Entry e2) {
|
||||||
Assert.assertEquals(e1 == null, e2 == null);
|
Assert.assertEquals(e1 == null, e2 == null);
|
||||||
if (e1 == null) {
|
if (e1 == null) {
|
||||||
|
|
|
@ -152,7 +152,11 @@ module Hbase
|
||||||
# Show the current tableCFs config for the specified peer
|
# Show the current tableCFs config for the specified peer
|
||||||
def show_peer_tableCFs(id)
|
def show_peer_tableCFs(id)
|
||||||
rpc = @admin.getReplicationPeerConfig(id)
|
rpc = @admin.getReplicationPeerConfig(id)
|
||||||
ReplicationPeerConfigUtil.convertToString(rpc.getTableCFsMap)
|
show_peer_tableCFs_by_config(rpc)
|
||||||
|
end
|
||||||
|
|
||||||
|
def show_peer_tableCFs_by_config(peer_config)
|
||||||
|
ReplicationPeerConfigUtil.convertToString(peer_config.getTableCFsMap)
|
||||||
end
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
|
@ -274,6 +278,49 @@ module Hbase
|
||||||
@replication_admin.updatePeerConfig(id, rpc)
|
@replication_admin.updatePeerConfig(id, rpc)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Set exclude namespaces config for the specified peer
|
||||||
|
def set_peer_exclude_namespaces(id, exclude_namespaces)
|
||||||
|
return if exclude_namespaces.nil?
|
||||||
|
exclude_ns_set = java.util.HashSet.new
|
||||||
|
exclude_namespaces.each do |n|
|
||||||
|
exclude_ns_set.add(n)
|
||||||
|
end
|
||||||
|
rpc = get_peer_config(id)
|
||||||
|
return if rpc.nil?
|
||||||
|
rpc.setExcludeNamespaces(exclude_ns_set)
|
||||||
|
@admin.updateReplicationPeerConfig(id, rpc)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Show the exclude namespaces config for the specified peer
|
||||||
|
def show_peer_exclude_namespaces(peer_config)
|
||||||
|
namespaces = peer_config.getExcludeNamespaces
|
||||||
|
return nil if namespaces.nil?
|
||||||
|
namespaces = java.util.ArrayList.new(namespaces)
|
||||||
|
java.util.Collections.sort(namespaces)
|
||||||
|
'!' + namespaces.join(';')
|
||||||
|
end
|
||||||
|
|
||||||
|
# Set exclude tableCFs config for the specified peer
|
||||||
|
def set_peer_exclude_tableCFs(id, exclude_tableCFs)
|
||||||
|
return if exclude_tableCFs.nil?
|
||||||
|
# convert tableCFs to TableName
|
||||||
|
map = java.util.HashMap.new
|
||||||
|
exclude_tableCFs.each do |key, val|
|
||||||
|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
|
||||||
|
end
|
||||||
|
rpc = get_peer_config(id)
|
||||||
|
return if rpc.nil?
|
||||||
|
rpc.setExcludeTableCFsMap(map)
|
||||||
|
@admin.updateReplicationPeerConfig(id, rpc)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Show the exclude tableCFs config for the specified peer
|
||||||
|
def show_peer_exclude_tableCFs(peer_config)
|
||||||
|
tableCFs = peer_config.getExcludeTableCFsMap
|
||||||
|
return nil if tableCFs.nil?
|
||||||
|
'!' + ReplicationPeerConfigUtil.convertToString(tableCFs)
|
||||||
|
end
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Enables a table's replication switch
|
# Enables a table's replication switch
|
||||||
def enable_tablerep(table_name)
|
def enable_tablerep(table_name)
|
||||||
|
|
|
@ -381,8 +381,10 @@ Shell.load_command_group(
|
||||||
set_peer_namespaces
|
set_peer_namespaces
|
||||||
append_peer_namespaces
|
append_peer_namespaces
|
||||||
remove_peer_namespaces
|
remove_peer_namespaces
|
||||||
|
set_peer_exclude_namespaces
|
||||||
show_peer_tableCFs
|
show_peer_tableCFs
|
||||||
set_peer_tableCFs
|
set_peer_tableCFs
|
||||||
|
set_peer_exclude_tableCFs
|
||||||
set_peer_bandwidth
|
set_peer_bandwidth
|
||||||
list_replicated_tables
|
list_replicated_tables
|
||||||
append_peer_tableCFs
|
append_peer_tableCFs
|
||||||
|
|
|
@ -25,6 +25,12 @@ module Shell
|
||||||
<<-EOF
|
<<-EOF
|
||||||
List all replication peer clusters.
|
List all replication peer clusters.
|
||||||
|
|
||||||
|
If replicate_all flag is false, the namespaces and table-cfs in peer config
|
||||||
|
will be replicated to peer cluster.
|
||||||
|
|
||||||
|
If replicate_all flag is true, all user tables will be replicate to peer
|
||||||
|
cluster, except that the namespaces and table-cfs in peer config.
|
||||||
|
|
||||||
hbase> list_peers
|
hbase> list_peers
|
||||||
EOF
|
EOF
|
||||||
end
|
end
|
||||||
|
@ -39,8 +45,13 @@ EOF
|
||||||
id = peer.getPeerId
|
id = peer.getPeerId
|
||||||
state = peer.isEnabled ? 'ENABLED' : 'DISABLED'
|
state = peer.isEnabled ? 'ENABLED' : 'DISABLED'
|
||||||
config = peer.getPeerConfig
|
config = peer.getPeerConfig
|
||||||
|
if config.replicateAllUserTables
|
||||||
|
namespaces = replication_admin.show_peer_exclude_namespaces(config)
|
||||||
|
tableCFs = replication_admin.show_peer_exclude_tableCFs(config)
|
||||||
|
else
|
||||||
namespaces = replication_admin.show_peer_namespaces(config)
|
namespaces = replication_admin.show_peer_namespaces(config)
|
||||||
tableCFs = replication_admin.show_peer_tableCFs(id)
|
tableCFs = replication_admin.show_peer_tableCFs_by_config(config)
|
||||||
|
end
|
||||||
formatter.row([id, config.getClusterKey,
|
formatter.row([id, config.getClusterKey,
|
||||||
config.getReplicationEndpointImpl, state,
|
config.getReplicationEndpointImpl, state,
|
||||||
config.replicateAllUserTables, namespaces, tableCFs,
|
config.replicateAllUserTables, namespaces, tableCFs,
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
#
|
||||||
|
# Copyright The Apache Software Foundation
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class SetPeerExcludeNamespaces < Command
|
||||||
|
def help
|
||||||
|
<<-EOF
|
||||||
|
Set the namespaces which not replicated for the specified peer.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
1. The replicate_all flag need to be true when set exclude namespaces.
|
||||||
|
2. Set a exclude namespace in the peer config means that all tables in this
|
||||||
|
namespace will not be replicated to the peer cluster. If peer config
|
||||||
|
already has a exclude table, then not allow set this table's namespace
|
||||||
|
as a exclude namespace.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
# set exclude namespaces config to null
|
||||||
|
hbase> set_peer_exclude_namespaces '1', []
|
||||||
|
# set namespaces which not replicated for a peer.
|
||||||
|
# set a exclude namespace in the peer config means that all tables in this
|
||||||
|
# namespace will not be replicated.
|
||||||
|
hbase> set_peer_exclude_namespaces '2', ["ns1", "ns2"]
|
||||||
|
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command(id, exclude_namespaces)
|
||||||
|
replication_admin.set_peer_exclude_namespaces(id, exclude_namespaces)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,51 @@
|
||||||
|
#
|
||||||
|
# Copyright The Apache Software Foundation
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
module Shell
|
||||||
|
module Commands
|
||||||
|
class SetPeerExcludeTableCFs < Command
|
||||||
|
def help
|
||||||
|
<<-EOF
|
||||||
|
Set the table-cfs which not replicated for the specified peer.
|
||||||
|
|
||||||
|
Note:
|
||||||
|
1. The replicate_all flag need to be true when set exclude table-cfs.
|
||||||
|
2. If peer config already has a exclude namespace, then not allow set any
|
||||||
|
exclude table of this namespace to the peer config.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
# set exclude table-cfs to null
|
||||||
|
hbase> set_peer_exclude_tableCFs '1'
|
||||||
|
# set table / table-cf which not replicated for a peer, for a table without
|
||||||
|
# an explicit column-family list, all column-families will not be replicated
|
||||||
|
hbase> set_peer_exclude_tableCFs '2', { "ns1:table1" => [],
|
||||||
|
"ns2:table2" => ["cf1", "cf2"],
|
||||||
|
"ns3:table3" => ["cfA", "cfB"]}
|
||||||
|
|
||||||
|
EOF
|
||||||
|
end
|
||||||
|
|
||||||
|
def command(id, exclude_peer_table_cfs = nil)
|
||||||
|
replication_admin.set_peer_exclude_tableCFs(id, exclude_peer_table_cfs)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -25,10 +25,10 @@ module Shell
|
||||||
<<-EOF
|
<<-EOF
|
||||||
Set the replicable namespaces config for the specified peer.
|
Set the replicable namespaces config for the specified peer.
|
||||||
|
|
||||||
Set a namespace in the peer config means that all tables in this
|
1. The replicate_all flag need to be false when set the replicable namespaces.
|
||||||
namespace will be replicated to the peer cluster. So if you already
|
2. Set a namespace in the peer config means that all tables in this namespace
|
||||||
have set a namespace in the peer config, then you can't set this
|
will be replicated to the peer cluster. If peer config already has a table,
|
||||||
namespace's tables in the peer config again.
|
then not allow set this table's namespace to the peer config.
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,10 @@ module Shell
|
||||||
Set the replicate_all flag to true or false for the specified peer.
|
Set the replicate_all flag to true or false for the specified peer.
|
||||||
|
|
||||||
If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
|
If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
|
||||||
will be replicate to peer cluster.
|
will be replicate to peer cluster. But you can use 'set_peer_exclude_namespaces'
|
||||||
|
to set which namespaces can't be replicated to peer cluster. And you can use
|
||||||
|
'set_peer_exclude_tableCFs' to set which tables can't be replicated to peer
|
||||||
|
cluster.
|
||||||
|
|
||||||
If replicate_all flag is false, then all user tables cannot be replicate to
|
If replicate_all flag is false, then all user tables cannot be replicate to
|
||||||
peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
|
peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
|
||||||
|
@ -36,6 +39,9 @@ module Shell
|
||||||
|
|
||||||
Notice: When you want to change a peer's replicate_all flag from false to true,
|
Notice: When you want to change a peer's replicate_all flag from false to true,
|
||||||
you need clean the peer's NAMESPACES and TABLECFS config firstly.
|
you need clean the peer's NAMESPACES and TABLECFS config firstly.
|
||||||
|
When you want to change a peer's replicate_all flag from true to false,
|
||||||
|
you need clean the peer's EXCLUDE_NAMESPACES and EXCLUDE_TABLECFS
|
||||||
|
config firstly.
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,9 @@ module Shell
|
||||||
<<-EOF
|
<<-EOF
|
||||||
Set the replicable table-cf config for the specified peer.
|
Set the replicable table-cf config for the specified peer.
|
||||||
|
|
||||||
Can't set a table to table-cfs config if it's namespace already was in
|
Note:
|
||||||
|
1. The replicate_all flag need to be false when set the replicable table-cfs.
|
||||||
|
2. Can't set a table to table-cfs config if it's namespace already was in
|
||||||
namespaces config of this peer.
|
namespaces config of this peer.
|
||||||
|
|
||||||
Examples:
|
Examples:
|
||||||
|
|
|
@ -294,6 +294,29 @@ module Hbase
|
||||||
command(:remove_peer, @peer_id)
|
command(:remove_peer, @peer_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do
|
||||||
|
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
command(:add_peer, @peer_id, args)
|
||||||
|
|
||||||
|
assert_equal(1, command(:list_peers).length)
|
||||||
|
peer = command(:list_peers).get(0)
|
||||||
|
assert_equal(@peer_id, peer.getPeerId)
|
||||||
|
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
|
||||||
|
|
||||||
|
table_cfs = { 'table1' => [], 'table2' => ['cf1'],
|
||||||
|
'ns3:table3' => ['cf1', 'cf2'] }
|
||||||
|
command(:set_peer_exclude_tableCFs, @peer_id, table_cfs)
|
||||||
|
assert_equal(1, command(:list_peers).length)
|
||||||
|
peer = command(:list_peers).get(0)
|
||||||
|
peer_config = peer.getPeerConfig
|
||||||
|
assert_equal(true, peer_config.replicateAllUserTables)
|
||||||
|
assert_tablecfs_equal(table_cfs, peer_config.getExcludeTableCFsMap)
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
replication_admin.remove_peer(@peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
define_test "set_peer_namespaces: works with namespaces array" do
|
define_test "set_peer_namespaces: works with namespaces array" do
|
||||||
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
|
||||||
namespaces = ["ns1", "ns2"]
|
namespaces = ["ns1", "ns2"]
|
||||||
|
@ -395,6 +418,25 @@ module Hbase
|
||||||
command(:remove_peer, @peer_id)
|
command(:remove_peer, @peer_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test 'set_peer_exclude_namespaces: works with namespaces array' do
|
||||||
|
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
||||||
|
namespaces = ['ns1', 'ns2']
|
||||||
|
namespaces_str = '!ns1;ns2'
|
||||||
|
|
||||||
|
args = { CLUSTER_KEY => cluster_key }
|
||||||
|
command(:add_peer, @peer_id, args)
|
||||||
|
command(:set_peer_exclude_namespaces, @peer_id, namespaces)
|
||||||
|
|
||||||
|
assert_equal(1, command(:list_peers).length)
|
||||||
|
peer_config = command(:list_peers).get(0).getPeerConfig
|
||||||
|
assert_equal(true, peer_config.replicateAllUserTables)
|
||||||
|
assert_equal(namespaces_str,
|
||||||
|
replication_admin.show_peer_exclude_namespaces(peer_config))
|
||||||
|
|
||||||
|
# cleanup for future tests
|
||||||
|
command(:remove_peer, @peer_id)
|
||||||
|
end
|
||||||
|
|
||||||
define_test 'set_peer_replicate_all' do
|
define_test 'set_peer_replicate_all' do
|
||||||
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue