HBASE-19492 Add EXCLUDE_NAMESPACE and EXCLUDE_TABLECFS support to replication peer config

This commit is contained in:
Guanghao Zhang 2017-12-12 11:19:57 +08:00
parent 7a7e55b601
commit 03e79b7994
17 changed files with 721 additions and 132 deletions

View File

@ -48,6 +48,7 @@ import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Helper for TableCFs Operations.
@ -289,11 +290,8 @@ public final class ReplicationPeerConfigUtil {
List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) {
Set<String> namespaces = new HashSet<>();
for (ByteString namespace : namespacesList) {
namespaces.add(namespace.toStringUtf8());
}
peerConfig.setNamespaces(namespaces);
peerConfig.setNamespaces(
namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
}
if (peer.hasBandwidth()) {
@ -304,6 +302,19 @@ public final class ReplicationPeerConfigUtil {
peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
}
Map<TableName, ? extends Collection<String>> excludeTableCFsMap =
convert2Map(peer.getExcludeTableCfsList()
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
if (excludeTableCFsMap != null) {
peerConfig.setExcludeTableCFsMap(excludeTableCFsMap);
}
List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
peerConfig.setExcludeNamespaces(
excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
}
return peerConfig;
}
@ -346,6 +357,20 @@ public final class ReplicationPeerConfigUtil {
builder.setBandwidth(peerConfig.getBandwidth());
builder.setReplicateAll(peerConfig.replicateAllUserTables());
ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
if (excludeTableCFs != null) {
for (int i = 0; i < excludeTableCFs.length; i++) {
builder.addExcludeTableCfs(excludeTableCFs[i]);
}
}
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
if (excludeNamespaces != null) {
for (String namespace : excludeNamespaces) {
builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
}
}
return builder.build();
}

View File

@ -44,6 +44,8 @@ public class ReplicationPeerConfig {
private long bandwidth = 0;
// Default value is true, means replicate all user tables to peer cluster.
private boolean replicateAllUserTables = true;
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
private Set<String> excludeNamespaces = null;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ -121,16 +123,44 @@ public class ReplicationPeerConfig {
return this;
}
public Map<TableName, List<String>> getExcludeTableCFsMap() {
return (Map<TableName, List<String>>) excludeTableCFsMap;
}
public ReplicationPeerConfig setExcludeTableCFsMap(Map<TableName,
? extends Collection<String>> tableCFsMap) {
this.excludeTableCFsMap = tableCFsMap;
return this;
}
public Set<String> getExcludeNamespaces() {
return this.excludeNamespaces;
}
public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
this.excludeNamespaces = namespaces;
return this;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
if (namespaces != null) {
builder.append("namespaces=").append(namespaces.toString()).append(",");
}
if (tableCFsMap != null) {
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
if (replicateAllUserTables) {
if (excludeNamespaces != null) {
builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(",");
}
if (excludeTableCFsMap != null) {
builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(",");
}
} else {
if (namespaces != null) {
builder.append("namespaces=").append(namespaces.toString()).append(",");
}
if (tableCFsMap != null) {
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
}
}
builder.append("bandwidth=").append(bandwidth);
return builder.toString();
@ -142,17 +172,22 @@ public class ReplicationPeerConfig {
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
// If null means user has explicitly not configured any namespaces and table CFs
// so all the tables data are applicable for replication
if (namespaces == null && tableCFsMap == null) {
if (replicateAllUserTables) {
if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
return false;
}
if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
return false;
}
return true;
} else {
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
return true;
}
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
return true;
}
return false;
}
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
return true;
}
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
return true;
}
return false;
}
}

View File

@ -46,6 +46,8 @@ message ReplicationPeer {
repeated bytes namespaces = 6;
optional int64 bandwidth = 7;
optional bool replicate_all = 8;
repeated TableCF exclude_table_cfs = 9;
repeated bytes exclude_namespaces = 10;
}
/**

View File

@ -369,6 +369,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
existingConfig.setNamespaces(newConfig.getNamespaces());
existingConfig.setBandwidth(newConfig.getBandwidth());
existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces());
existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),

View File

@ -118,15 +118,32 @@ public class ReplicationManager {
return peers;
}
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
IOException {
/**
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
* Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
* peer cluster.
*
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
*/
private void checkPeerConfig(ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new ReplicationException(
"Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
+ " when replicate_all flag is true");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
} else {
if ((peerConfig.getExcludeNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getExcludeTableCFsMap() != null
&& !peerConfig.getTableCFsMap().isEmpty())) {
throw new ReplicationException(
"Need clean exclude-namespaces or exclude-table-cfs config firstly"
+ " when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
}
@ -134,17 +151,19 @@ public class ReplicationManager {
}
/**
* Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster.
* Set a namespace in the peer config means that all tables in this namespace will be replicated
* to the peer cluster.
* 1. If peer config already has a namespace, then not allow set any table of this namespace
* to the peer config.
* 2. If peer config already has a table, then not allow set this table's namespace to the peer
* config.
*
* 1. If you already have set a namespace in the peer config, then you can't set any table
* of this namespace to the peer config.
* 2. If you already have set a table in the peer config, then you can't set this table's
* namespace to the peer config.
*
* @param namespaces
* @param tableCfs
* @throws ReplicationException
* Set a exclude namespace in the peer config means that all tables in this namespace can't be
* replicated to the peer cluster.
* 1. If peer config already has a exclude namespace, then not allow set any exclude table of
* this namespace to the peer config.
* 2. If peer config already has a exclude table, then not allow set this table's namespace
* as a exclude namespace.
*/
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
@ -157,8 +176,8 @@ public class ReplicationManager {
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
if (namespaces.contains(table.getNamespaceAsString())) {
throw new ReplicationException(
"Table-cfs config conflict with namespaces config in peer");
throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
+ table.getNamespaceAsString() + " in peer config");
}
}
}

View File

@ -32,16 +32,17 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
/**
* Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry
* by namespaces config, then filter entry by table-cfs config.
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
* exclude namespaces config, and exclude table-cfs config.
*
* 1. Set a namespace in peer config means that all tables in this namespace will be replicated.
* 2. If the namespaces config is null, then the table-cfs config decide which table's edit
* can be replicated. If the table-cfs config is null, then the namespaces config decide
* which table's edit can be replicated.
* If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
* you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
* Note: set a exclude namespace means that all tables in this namespace can't be replicated.
*
* If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
* But you can set namespaces or table-cfs which will be replicated to peer cluster.
* Note: set a namespace means that all tables in this namespace will be replicated.
*/
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
@ -61,7 +62,15 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, so return entry directly
// replicate all user tables, but filter by exclude namespaces config
Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
// return null(prevent replicating) if logKey's table is in this peer's
// exclude namespaces list
if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return null;
}
return entry;
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
@ -80,7 +89,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
// Then filter by table-cfs config
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicaable namespace list and table list
// replicable tables list
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
return null;
}
@ -93,34 +102,39 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
public Cell filterCell(final Entry entry, Cell cell) {
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, so return cell directly
// replicate all user tables, but filter by exclude table-cfs config
final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
if (excludeTableCfs == null) {
return cell;
}
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell,
fam -> filterByExcludeTableCfs(entry.getKey().getTablename(), Bytes.toString(fam),
excludeTableCfs));
} else {
if (filterByExcludeTableCfs(entry.getKey().getTablename(),
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
excludeTableCfs)) {
return null;
}
}
return cell;
} else {
// not replicate all user tables, so filter by table-cfs config
final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
if (tableCfs == null) {
return cell;
}
TableName tabName = entry.getKey().getTablename();
List<String> cfs = tableCfs.get(tabName);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
public boolean apply(byte[] fam) {
if (tableCfs != null) {
List<String> cfs = tableCfs.get(entry.getKey().getTablename());
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
return true;
}
}
return false;
}
});
cell = bulkLoadFilter.filterCell(cell,
fam -> filterByTableCfs(entry.getKey().getTablename(), Bytes.toString(fam), tableCfs));
} else {
if ((cfs != null)
&& !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength()))) {
if (filterByTableCfs(entry.getKey().getTablename(),
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
tableCfs)) {
return null;
}
}
@ -128,4 +142,31 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
return cell;
}
}
private boolean filterByExcludeTableCfs(TableName tableName, String family,
Map<TableName, List<String>> excludeTableCfs) {
List<String> excludeCfs = excludeTableCfs.get(tableName);
if (excludeCfs != null) {
// empty cfs means all cfs of this table are excluded
if (excludeCfs.isEmpty()) {
return true;
}
// ignore(remove) kv if its cf is in the exclude cfs list
if (excludeCfs.contains(family)) {
return true;
}
}
return false;
}
private boolean filterByTableCfs(TableName tableName, String family,
Map<TableName, List<String>> tableCfs) {
List<String> cfs = tableCfs.get(tableName);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (cfs != null && !cfs.contains(family)) {
return true;
}
return false;
}
}

View File

@ -482,9 +482,98 @@ public class TestReplicationAdmin {
hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
public void testPeerExcludeNamespaces() throws Exception {
String ns1 = "ns1";
String ns2 = "ns2";
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertTrue(rpc.replicateAllUserTables());
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
namespaces.add(ns2);
rpc.setExcludeNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
assertEquals(2, namespaces.size());
assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2));
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
namespaces.clear();
namespaces.add(ns1);
rpc.setExcludeNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
assertEquals(1, namespaces.size());
assertTrue(namespaces.contains(ns1));
hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
public void testPeerExcludeTableCFs() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
TableName tab1 = TableName.valueOf("t1");
TableName tab2 = TableName.valueOf("t2");
TableName tab3 = TableName.valueOf("t3");
TableName tab4 = TableName.valueOf("t4");
// Add a valid peer
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertTrue(rpc.replicateAllUserTables());
Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
tableCFs.put(tab1, null);
rpc.setExcludeTableCFsMap(tableCFs);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
Map<TableName, List<String>> result =
hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
assertEquals(1, result.size());
assertEquals(true, result.containsKey(tab1));
assertNull(result.get(tab1));
tableCFs.put(tab2, new ArrayList<String>());
tableCFs.get(tab2).add("f1");
rpc.setExcludeTableCFsMap(tableCFs);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
assertEquals(2, result.size());
assertTrue("Should contain t1", result.containsKey(tab1));
assertTrue("Should contain t2", result.containsKey(tab2));
assertNull(result.get(tab1));
assertEquals(1, result.get(tab2).size());
assertEquals("f1", result.get(tab2).get(0));
tableCFs.clear();
tableCFs.put(tab3, new ArrayList<String>());
tableCFs.put(tab4, new ArrayList<String>());
tableCFs.get(tab4).add("f1");
tableCFs.get(tab4).add("f2");
rpc.setExcludeTableCFsMap(tableCFs);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
assertEquals(2, result.size());
assertTrue("Should contain t3", result.containsKey(tab3));
assertTrue("Should contain t4", result.containsKey(tab4));
assertNull(result.get(tab3));
assertEquals(2, result.get(tab4).size());
assertEquals("f1", result.get(tab4).get(0));
assertEquals("f2", result.get(tab4).get(1));
hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
public void testPeerConfigConflict() throws Exception {
// Default replicate all flag is true
// Default replicate_all flag is true
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
@ -492,39 +581,68 @@ public class TestReplicationAdmin {
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
TableName tab1 = TableName.valueOf("ns1:tabl");
TableName tab1 = TableName.valueOf("ns2:tabl");
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(tab1, new ArrayList<String>());
try {
rpc.setNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
fail("Should throw Exception."
+ " When replicate all flag is true, no need to config namespaces");
} catch (IOException e) {
// OK
rpc.setNamespaces(null);
}
try {
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
} catch (IOException e) {
// OK
rpc.setTableCFsMap(null);
}
try {
rpc.setNamespaces(namespaces);
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception."
+ " When replicate all flag is true, no need to config namespaces or table-cfs");
+ " When replicate all flag is true, no need to config table-cfs");
} catch (IOException e) {
// OK
rpc.setNamespaces(null);
rpc.setTableCFsMap(null);
}
// Set replicate_all flag to true
rpc.setReplicateAllUserTables(false);
try {
rpc.setExcludeNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception."
+ " When replicate all flag is false, no need to config exclude namespaces");
} catch (IOException e) {
// OK
rpc.setExcludeNamespaces(null);
}
try {
rpc.setExcludeTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception."
+ " When replicate all flag is false, no need to config exclude table-cfs");
} catch (IOException e) {
// OK
rpc.setExcludeTableCFsMap(null);
}
rpc.setNamespaces(namespaces);
rpc.setTableCFsMap(tableCfs);
// OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
// Default replicate_all flag is true
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
rpc2.setExcludeNamespaces(namespaces);
rpc2.setExcludeTableCFsMap(tableCfs);
// OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
// table-cfs config
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
hbaseAdmin.removeReplicationPeer(ID_ONE);
hbaseAdmin.removeReplicationPeer(ID_SECOND);
}
@Test
@ -539,41 +657,80 @@ public class TestReplicationAdmin {
rpc.setReplicateAllUserTables(false);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName1, new ArrayList<>());
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
try {
admin.updatePeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException, because table " + tableName1 + " conflict with namespace "
+ ns1);
} catch (IOException e) {
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName1, new ArrayList<>());
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException" + " Because table " + tableName1
+ " conflict with namespace " + ns1);
} catch (Exception e) {
// OK
}
rpc = admin.getPeerConfig(ID_ONE);
tableCfs.clear();
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName2, new ArrayList<>());
rpc.setTableCFsMap(tableCfs);
admin.updatePeerConfig(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
namespaces.clear();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
try {
admin.updatePeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table "
+ tableName2);
} catch (IOException e) {
namespaces.clear();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException" + " Because namespace " + ns2
+ " conflict with table " + tableName2);
} catch (Exception e) {
// OK
}
admin.removePeer(ID_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
Set<String> excludeNamespaces = new HashSet<String>();
excludeNamespaces.add(ns1);
rpc2.setExcludeNamespaces(excludeNamespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
try {
Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
excludeTableCfs.put(tableName1, new ArrayList<>());
rpc2.setExcludeTableCFsMap(excludeTableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
fail("Should throw ReplicationException" + " Because exclude table " + tableName1
+ " conflict with exclude namespace " + ns1);
} catch (Exception e) {
// OK
}
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
excludeTableCfs.put(tableName2, new ArrayList<>());
rpc2.setExcludeTableCFsMap(excludeTableCfs);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
try {
namespaces.clear();
namespaces.add(ns2);
rpc2.setNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
+ " conflict with exclude table " + tableName2);
} catch (Exception e) {
// OK
}
hbaseAdmin.removeReplicationPeer(ID_ONE);
hbaseAdmin.removeReplicationPeer(ID_SECOND);
}
@Test

View File

@ -22,8 +22,7 @@ import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -47,6 +46,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@Category({ReplicationTests.class, SmallTests.class})
public class TestReplicationWALEntryFilters {
@ -205,23 +206,17 @@ public class TestReplicationWALEntryFilters {
ReplicationPeer peer = mock(ReplicationPeer.class);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
// 1. replicate all user tables
when(peerConfig.replicateAllUserTables()).thenReturn(true);
when(peer.getPeerConfig()).thenReturn(peerConfig);
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// 2. not replicate all user tables, no namespaces and table-cfs config
// 1. replicate_all flag is false, no namespaces and table-cfs config
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(null);
when(peerConfig.getTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// 3. Only config table-cfs in peer
// 2. replicate_all flag is false, and only config table-cfs in peer
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>();
@ -261,7 +256,7 @@ public class TestReplicationWALEntryFilters {
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
// 3. Only config namespaces in peer
// 3. replicate_all flag is false, and only config namespaces in peer
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<>();
@ -292,7 +287,7 @@ public class TestReplicationWALEntryFilters {
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// 4. Config namespaces and table-cfs both
// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config should not confict with table-cfs config
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
@ -331,9 +326,110 @@ public class TestReplicationWALEntryFilters {
assertEquals(null, filter.filter(userEntry));
}
@Test
public void testNamespaceTableCfWALEntryFilter2() {
ReplicationPeer peer = mock(ReplicationPeer.class);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
// 1. replicate_all flag is true
// and no exclude namespaces and no exclude table-cfs config
when(peerConfig.replicateAllUserTables()).thenReturn(true);
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// 2. replicate_all flag is true, and only config exclude namespaces
// empty set
Set<String> namespaces = new HashSet<String>();
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude namespace default
namespaces.add("default");
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// exclude namespace ns1
namespaces = new HashSet<String>();
namespaces.add("ns1");
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// 3. replicate_all flag is true, and only config exclude table-cfs
// empty table-cfs map
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude table bar
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude table foo:a
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peerConfig.getExcludeNamespaces()).thenReturn(null);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
// 4. replicate_all flag is true, and config exclude namespaces and table-cfs both
// exclude ns1 and table foo:a,c
namespaces = new HashSet<String>();
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b), filter.filter(userEntry));
// exclude namespace default and table ns1:bar
namespaces = new HashSet<String>();
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
}
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
WALKeyImpl key1 =
new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes);
WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
System.currentTimeMillis(), scopes);
WALEdit edit1 = new WALEdit();
for (byte[] kv : kvs) {
@ -342,7 +438,6 @@ public class TestReplicationWALEntryFilters {
return new Entry(key1, edit1);
}
private void assertEquals(Entry e1, Entry e2) {
Assert.assertEquals(e1 == null, e2 == null);
if (e1 == null) {

View File

@ -152,7 +152,11 @@ module Hbase
# Show the current tableCFs config for the specified peer
def show_peer_tableCFs(id)
rpc = @admin.getReplicationPeerConfig(id)
ReplicationPeerConfigUtil.convertToString(rpc.getTableCFsMap)
show_peer_tableCFs_by_config(rpc)
end
def show_peer_tableCFs_by_config(peer_config)
ReplicationPeerConfigUtil.convertToString(peer_config.getTableCFsMap)
end
#----------------------------------------------------------------------------------------------
@ -274,6 +278,49 @@ module Hbase
@replication_admin.updatePeerConfig(id, rpc)
end
# Set exclude namespaces config for the specified peer
def set_peer_exclude_namespaces(id, exclude_namespaces)
return if exclude_namespaces.nil?
exclude_ns_set = java.util.HashSet.new
exclude_namespaces.each do |n|
exclude_ns_set.add(n)
end
rpc = get_peer_config(id)
return if rpc.nil?
rpc.setExcludeNamespaces(exclude_ns_set)
@admin.updateReplicationPeerConfig(id, rpc)
end
# Show the exclude namespaces config for the specified peer
def show_peer_exclude_namespaces(peer_config)
namespaces = peer_config.getExcludeNamespaces
return nil if namespaces.nil?
namespaces = java.util.ArrayList.new(namespaces)
java.util.Collections.sort(namespaces)
'!' + namespaces.join(';')
end
# Set exclude tableCFs config for the specified peer
def set_peer_exclude_tableCFs(id, exclude_tableCFs)
return if exclude_tableCFs.nil?
# convert tableCFs to TableName
map = java.util.HashMap.new
exclude_tableCFs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
rpc = get_peer_config(id)
return if rpc.nil?
rpc.setExcludeTableCFsMap(map)
@admin.updateReplicationPeerConfig(id, rpc)
end
# Show the exclude tableCFs config for the specified peer
def show_peer_exclude_tableCFs(peer_config)
tableCFs = peer_config.getExcludeTableCFsMap
return nil if tableCFs.nil?
'!' + ReplicationPeerConfigUtil.convertToString(tableCFs)
end
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)

View File

@ -381,8 +381,10 @@ Shell.load_command_group(
set_peer_namespaces
append_peer_namespaces
remove_peer_namespaces
set_peer_exclude_namespaces
show_peer_tableCFs
set_peer_tableCFs
set_peer_exclude_tableCFs
set_peer_bandwidth
list_replicated_tables
append_peer_tableCFs

View File

@ -23,7 +23,13 @@ module Shell
class ListPeers < Command
def help
<<-EOF
List all replication peer clusters.
List all replication peer clusters.
If replicate_all flag is false, the namespaces and table-cfs in peer config
will be replicated to peer cluster.
If replicate_all flag is true, all user tables will be replicate to peer
cluster, except that the namespaces and table-cfs in peer config.
hbase> list_peers
EOF
@ -39,8 +45,13 @@ EOF
id = peer.getPeerId
state = peer.isEnabled ? 'ENABLED' : 'DISABLED'
config = peer.getPeerConfig
namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs(id)
if config.replicateAllUserTables
namespaces = replication_admin.show_peer_exclude_namespaces(config)
tableCFs = replication_admin.show_peer_exclude_tableCFs(config)
else
namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs_by_config(config)
end
formatter.row([id, config.getClusterKey,
config.getReplicationEndpointImpl, state,
config.replicateAllUserTables, namespaces, tableCFs,

View File

@ -0,0 +1,52 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class SetPeerExcludeNamespaces < Command
def help
<<-EOF
Set the namespaces which not replicated for the specified peer.
Note:
1. The replicate_all flag need to be true when set exclude namespaces.
2. Set a exclude namespace in the peer config means that all tables in this
namespace will not be replicated to the peer cluster. If peer config
already has a exclude table, then not allow set this table's namespace
as a exclude namespace.
Examples:
# set exclude namespaces config to null
hbase> set_peer_exclude_namespaces '1', []
# set namespaces which not replicated for a peer.
# set a exclude namespace in the peer config means that all tables in this
# namespace will not be replicated.
hbase> set_peer_exclude_namespaces '2', ["ns1", "ns2"]
EOF
end
def command(id, exclude_namespaces)
replication_admin.set_peer_exclude_namespaces(id, exclude_namespaces)
end
end
end
end

View File

@ -0,0 +1,51 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class SetPeerExcludeTableCFs < Command
def help
<<-EOF
Set the table-cfs which not replicated for the specified peer.
Note:
1. The replicate_all flag need to be true when set exclude table-cfs.
2. If peer config already has a exclude namespace, then not allow set any
exclude table of this namespace to the peer config.
Examples:
# set exclude table-cfs to null
hbase> set_peer_exclude_tableCFs '1'
# set table / table-cf which not replicated for a peer, for a table without
# an explicit column-family list, all column-families will not be replicated
hbase> set_peer_exclude_tableCFs '2', { "ns1:table1" => [],
"ns2:table2" => ["cf1", "cf2"],
"ns3:table3" => ["cfA", "cfB"]}
EOF
end
def command(id, exclude_peer_table_cfs = nil)
replication_admin.set_peer_exclude_tableCFs(id, exclude_peer_table_cfs)
end
end
end
end

View File

@ -25,10 +25,10 @@ module Shell
<<-EOF
Set the replicable namespaces config for the specified peer.
Set a namespace in the peer config means that all tables in this
namespace will be replicated to the peer cluster. So if you already
have set a namespace in the peer config, then you can't set this
namespace's tables in the peer config again.
1. The replicate_all flag need to be false when set the replicable namespaces.
2. Set a namespace in the peer config means that all tables in this namespace
will be replicated to the peer cluster. If peer config already has a table,
then not allow set this table's namespace to the peer config.
Examples:

View File

@ -26,7 +26,10 @@ module Shell
Set the replicate_all flag to true or false for the specified peer.
If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
will be replicate to peer cluster.
will be replicate to peer cluster. But you can use 'set_peer_exclude_namespaces'
to set which namespaces can't be replicated to peer cluster. And you can use
'set_peer_exclude_tableCFs' to set which tables can't be replicated to peer
cluster.
If replicate_all flag is false, then all user tables cannot be replicate to
peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
@ -36,6 +39,9 @@ module Shell
Notice: When you want to change a peer's replicate_all flag from false to true,
you need clean the peer's NAMESPACES and TABLECFS config firstly.
When you want to change a peer's replicate_all flag from true to false,
you need clean the peer's EXCLUDE_NAMESPACES and EXCLUDE_TABLECFS
config firstly.
Examples:

View File

@ -25,8 +25,10 @@ module Shell
<<-EOF
Set the replicable table-cf config for the specified peer.
Can't set a table to table-cfs config if it's namespace already was in
namespaces config of this peer.
Note:
1. The replicate_all flag need to be false when set the replicable table-cfs.
2. Can't set a table to table-cfs config if it's namespace already was in
namespaces config of this peer.
Examples:

View File

@ -294,6 +294,29 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0)
assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { 'table1' => [], 'table2' => ['cf1'],
'ns3:table3' => ['cf1', 'cf2'] }
command(:set_peer_exclude_tableCFs, @peer_id, table_cfs)
assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0)
peer_config = peer.getPeerConfig
assert_equal(true, peer_config.replicateAllUserTables)
assert_tablecfs_equal(table_cfs, peer_config.getExcludeTableCFsMap)
# cleanup for future tests
replication_admin.remove_peer(@peer_id)
end
define_test "set_peer_namespaces: works with namespaces array" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
namespaces = ["ns1", "ns2"]
@ -395,6 +418,25 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test 'set_peer_exclude_namespaces: works with namespaces array' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
namespaces = ['ns1', 'ns2']
namespaces_str = '!ns1;ns2'
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
command(:set_peer_exclude_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(true, peer_config.replicateAllUserTables)
assert_equal(namespaces_str,
replication_admin.show_peer_exclude_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test 'set_peer_replicate_all' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'