diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 52a3c933eed..d8c86f09d4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.HashMap; import java.util.ArrayList; import java.util.Set; +import java.util.stream.Collectors; /** * Helper for TableCFs Operations. @@ -289,11 +290,8 @@ public final class ReplicationPeerConfigUtil { List namespacesList = peer.getNamespacesList(); if (namespacesList != null && namespacesList.size() != 0) { - Set namespaces = new HashSet<>(); - for (ByteString namespace : namespacesList) { - namespaces.add(namespace.toStringUtf8()); - } - peerConfig.setNamespaces(namespaces); + peerConfig.setNamespaces( + namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } if (peer.hasBandwidth()) { @@ -304,6 +302,19 @@ public final class ReplicationPeerConfigUtil { peerConfig.setReplicateAllUserTables(peer.getReplicateAll()); } + Map> excludeTableCFsMap = + convert2Map(peer.getExcludeTableCfsList() + .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); + if (excludeTableCFsMap != null) { + peerConfig.setExcludeTableCFsMap(excludeTableCFsMap); + } + + List excludeNamespacesList = peer.getExcludeNamespacesList(); + if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { + peerConfig.setExcludeNamespaces( + excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); + } + return peerConfig; } @@ -346,6 +357,20 @@ public final class ReplicationPeerConfigUtil { builder.setBandwidth(peerConfig.getBandwidth()); builder.setReplicateAll(peerConfig.replicateAllUserTables()); + + ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap()); + if (excludeTableCFs != null) { + for (int i = 0; i < excludeTableCFs.length; i++) { + builder.addExcludeTableCfs(excludeTableCFs[i]); + } + } + Set excludeNamespaces = peerConfig.getExcludeNamespaces(); + if (excludeNamespaces != null) { + for (String namespace : excludeNamespaces) { + builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace)); + } + } + return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 9e20829ae43..52a5fe9761b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -44,6 +44,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; // Default value is true, means replicate all user tables to peer cluster. private boolean replicateAllUserTables = true; + private Map> excludeTableCFsMap = null; + private Set excludeNamespaces = null; public ReplicationPeerConfig() { this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -121,16 +123,44 @@ public class ReplicationPeerConfig { return this; } + public Map> getExcludeTableCFsMap() { + return (Map>) excludeTableCFsMap; + } + + public ReplicationPeerConfig setExcludeTableCFsMap(Map> tableCFsMap) { + this.excludeTableCFsMap = tableCFsMap; + return this; + } + + public Set getExcludeNamespaces() { + return this.excludeNamespaces; + } + + public ReplicationPeerConfig setExcludeNamespaces(Set namespaces) { + this.excludeNamespaces = namespaces; + return this; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(","); builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(","); - if (namespaces != null) { - builder.append("namespaces=").append(namespaces.toString()).append(","); - } - if (tableCFsMap != null) { - builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); + if (replicateAllUserTables) { + if (excludeNamespaces != null) { + builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(","); + } + if (excludeTableCFsMap != null) { + builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(","); + } + } else { + if (namespaces != null) { + builder.append("namespaces=").append(namespaces.toString()).append(","); + } + if (tableCFsMap != null) { + builder.append("tableCFs=").append(tableCFsMap.toString()).append(","); + } } builder.append("bandwidth=").append(bandwidth); return builder.toString(); @@ -142,17 +172,22 @@ public class ReplicationPeerConfig { * @return true if the table need replicate to the peer cluster */ public boolean needToReplicate(TableName table) { - // If null means user has explicitly not configured any namespaces and table CFs - // so all the tables data are applicable for replication - if (namespaces == null && tableCFsMap == null) { + if (replicateAllUserTables) { + if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) { + return false; + } + if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) { + return false; + } return true; + } else { + if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { + return true; + } + if (tableCFsMap != null && tableCFsMap.containsKey(table)) { + return true; + } + return false; } - if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { - return true; - } - if (tableCFsMap != null && tableCFsMap.containsKey(table)) { - return true; - } - return false; } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index a1a7ade8ec1..8657c25d611 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -46,6 +46,8 @@ message ReplicationPeer { repeated bytes namespaces = 6; optional int64 bandwidth = 7; optional bool replicate_all = 8; + repeated TableCF exclude_table_cfs = 9; + repeated bytes exclude_namespaces = 10; } /** diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 2c3bbd5bbb7..ca99f65a295 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -369,6 +369,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re existingConfig.setNamespaces(newConfig.getNamespaces()); existingConfig.setBandwidth(newConfig.getBandwidth()); existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables()); + existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces()); + existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap()); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index f2a6c852512..749448d4dfd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -118,15 +118,32 @@ public class ReplicationManager { return peers; } - private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException, - IOException { + /** + * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to + * peer cluster. + * + * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. + * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. + */ + private void checkPeerConfig(ReplicationPeerConfig peerConfig) + throws ReplicationException, IOException { if (peerConfig.replicateAllUserTables()) { if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new ReplicationException( - "Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster"); + throw new ReplicationException("Need clean namespaces or table-cfs config firstly" + + " when replicate_all flag is true"); } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), + peerConfig.getExcludeTableCFsMap()); } else { + if ((peerConfig.getExcludeNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getExcludeTableCFsMap() != null + && !peerConfig.getTableCFsMap().isEmpty())) { + throw new ReplicationException( + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); + } checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); } @@ -134,17 +151,19 @@ public class ReplicationManager { } /** - * Set a namespace in the peer config means that all tables in this namespace - * will be replicated to the peer cluster. + * Set a namespace in the peer config means that all tables in this namespace will be replicated + * to the peer cluster. + * 1. If peer config already has a namespace, then not allow set any table of this namespace + * to the peer config. + * 2. If peer config already has a table, then not allow set this table's namespace to the peer + * config. * - * 1. If you already have set a namespace in the peer config, then you can't set any table - * of this namespace to the peer config. - * 2. If you already have set a table in the peer config, then you can't set this table's - * namespace to the peer config. - * - * @param namespaces - * @param tableCfs - * @throws ReplicationException + * Set a exclude namespace in the peer config means that all tables in this namespace can't be + * replicated to the peer cluster. + * 1. If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config. + * 2. If peer config already has a exclude table, then not allow set this table's namespace + * as a exclude namespace. */ private void checkNamespacesAndTableCfsConfigConflict(Set namespaces, Map> tableCfs) throws ReplicationException { @@ -157,8 +176,8 @@ public class ReplicationManager { for (Map.Entry> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); if (namespaces.contains(table.getNamespaceAsString())) { - throw new ReplicationException( - "Table-cfs config conflict with namespaces config in peer"); + throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces " + + table.getNamespaceAsString() + " in peer config"); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 9a4cc6c3795..5068cce8ef6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -32,16 +32,17 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate; - /** - * Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry - * by namespaces config, then filter entry by table-cfs config. + * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, + * exclude namespaces config, and exclude table-cfs config. * - * 1. Set a namespace in peer config means that all tables in this namespace will be replicated. - * 2. If the namespaces config is null, then the table-cfs config decide which table's edit - * can be replicated. If the table-cfs config is null, then the namespaces config decide - * which table's edit can be replicated. + * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But + * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster. + * Note: set a exclude namespace means that all tables in this namespace can't be replicated. + * + * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. + * But you can set namespaces or table-cfs which will be replicated to peer cluster. + * Note: set a namespace means that all tables in this namespace will be replicated. */ @InterfaceAudience.Private public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { @@ -61,7 +62,15 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); if (peerConfig.replicateAllUserTables()) { - // replicate all user tables, so return entry directly + // replicate all user tables, but filter by exclude namespaces config + Set excludeNamespaces = peerConfig.getExcludeNamespaces(); + + // return null(prevent replicating) if logKey's table is in this peer's + // exclude namespaces list + if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) { + return null; + } + return entry; } else { // Not replicate all user tables, so filter by namespaces and table-cfs config @@ -80,7 +89,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi // Then filter by table-cfs config // return null(prevent replicating) if logKey's table isn't in this peer's - // replicaable namespace list and table list + // replicable tables list if (tableCFs == null || !tableCFs.containsKey(tabName)) { return null; } @@ -93,34 +102,39 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi public Cell filterCell(final Entry entry, Cell cell) { ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); if (peerConfig.replicateAllUserTables()) { - // replicate all user tables, so return cell directly + // replicate all user tables, but filter by exclude table-cfs config + final Map> excludeTableCfs = peerConfig.getExcludeTableCFsMap(); + if (excludeTableCfs == null) { + return cell; + } + + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, + fam -> filterByExcludeTableCfs(entry.getKey().getTablename(), Bytes.toString(fam), + excludeTableCfs)); + } else { + if (filterByExcludeTableCfs(entry.getKey().getTablename(), + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), + excludeTableCfs)) { + return null; + } + } + return cell; } else { + // not replicate all user tables, so filter by table-cfs config final Map> tableCfs = peerConfig.getTableCFsMap(); if (tableCfs == null) { return cell; } - TableName tabName = entry.getKey().getTablename(); - List cfs = tableCfs.get(tabName); - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, new Predicate() { - @Override - public boolean apply(byte[] fam) { - if (tableCfs != null) { - List cfs = tableCfs.get(entry.getKey().getTablename()); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - return true; - } - } - return false; - } - }); + cell = bulkLoadFilter.filterCell(cell, + fam -> filterByTableCfs(entry.getKey().getTablename(), Bytes.toString(fam), tableCfs)); } else { - if ((cfs != null) - && !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength()))) { + if (filterByTableCfs(entry.getKey().getTablename(), + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()), + tableCfs)) { return null; } } @@ -128,4 +142,31 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi return cell; } } + + private boolean filterByExcludeTableCfs(TableName tableName, String family, + Map> excludeTableCfs) { + List excludeCfs = excludeTableCfs.get(tableName); + if (excludeCfs != null) { + // empty cfs means all cfs of this table are excluded + if (excludeCfs.isEmpty()) { + return true; + } + // ignore(remove) kv if its cf is in the exclude cfs list + if (excludeCfs.contains(family)) { + return true; + } + } + return false; + } + + private boolean filterByTableCfs(TableName tableName, String family, + Map> tableCfs) { + List cfs = tableCfs.get(tableName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (cfs != null && !cfs.contains(family)) { + return true; + } + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 19f117b2ac2..83a2e12dd58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -482,9 +482,98 @@ public class TestReplicationAdmin { hbaseAdmin.removeReplicationPeer(ID_ONE); } + @Test + public void testPeerExcludeNamespaces() throws Exception { + String ns1 = "ns1"; + String ns2 = "ns2"; + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertTrue(rpc.replicateAllUserTables()); + + Set namespaces = new HashSet(); + namespaces.add(ns1); + namespaces.add(ns2); + rpc.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); + assertEquals(2, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + assertTrue(namespaces.contains(ns2)); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + namespaces.clear(); + namespaces.add(ns1); + rpc.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); + assertEquals(1, namespaces.size()); + assertTrue(namespaces.contains(ns1)); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testPeerExcludeTableCFs() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + TableName tab1 = TableName.valueOf("t1"); + TableName tab2 = TableName.valueOf("t2"); + TableName tab3 = TableName.valueOf("t3"); + TableName tab4 = TableName.valueOf("t4"); + + // Add a valid peer + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertTrue(rpc.replicateAllUserTables()); + + Map> tableCFs = new HashMap>(); + tableCFs.put(tab1, null); + rpc.setExcludeTableCFsMap(tableCFs); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + Map> result = + hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(1, result.size()); + assertEquals(true, result.containsKey(tab1)); + assertNull(result.get(tab1)); + + tableCFs.put(tab2, new ArrayList()); + tableCFs.get(tab2).add("f1"); + rpc.setExcludeTableCFsMap(tableCFs); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t1", result.containsKey(tab1)); + assertTrue("Should contain t2", result.containsKey(tab2)); + assertNull(result.get(tab1)); + assertEquals(1, result.get(tab2).size()); + assertEquals("f1", result.get(tab2).get(0)); + + tableCFs.clear(); + tableCFs.put(tab3, new ArrayList()); + tableCFs.put(tab4, new ArrayList()); + tableCFs.get(tab4).add("f1"); + tableCFs.get(tab4).add("f2"); + rpc.setExcludeTableCFsMap(tableCFs); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); + assertEquals(2, result.size()); + assertTrue("Should contain t3", result.containsKey(tab3)); + assertTrue("Should contain t4", result.containsKey(tab4)); + assertNull(result.get(tab3)); + assertEquals(2, result.get(tab4).size()); + assertEquals("f1", result.get(tab4).get(0)); + assertEquals("f2", result.get(tab4).get(1)); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + @Test public void testPeerConfigConflict() throws Exception { - // Default replicate all flag is true + // Default replicate_all flag is true ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(KEY_ONE); @@ -492,39 +581,68 @@ public class TestReplicationAdmin { Set namespaces = new HashSet(); namespaces.add(ns1); - TableName tab1 = TableName.valueOf("ns1:tabl"); + TableName tab1 = TableName.valueOf("ns2:tabl"); Map> tableCfs = new HashMap>(); tableCfs.put(tab1, new ArrayList()); try { rpc.setNamespaces(namespaces); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception. When replicate all flag is true, no need to config namespaces"); + fail("Should throw Exception." + + " When replicate all flag is true, no need to config namespaces"); } catch (IOException e) { // OK rpc.setNamespaces(null); } try { - rpc.setTableCFsMap(tableCfs); - hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs"); - } catch (IOException e) { - // OK - rpc.setTableCFsMap(null); - } - - try { - rpc.setNamespaces(namespaces); rpc.setTableCFsMap(tableCfs); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); fail("Should throw Exception." - + " When replicate all flag is true, no need to config namespaces or table-cfs"); + + " When replicate all flag is true, no need to config table-cfs"); } catch (IOException e) { // OK - rpc.setNamespaces(null); rpc.setTableCFsMap(null); } + + // Set replicate_all flag to true + rpc.setReplicateAllUserTables(false); + try { + rpc.setExcludeNamespaces(namespaces); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + fail("Should throw Exception." + + " When replicate all flag is false, no need to config exclude namespaces"); + } catch (IOException e) { + // OK + rpc.setExcludeNamespaces(null); + } + + try { + rpc.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + fail("Should throw Exception." + + " When replicate all flag is false, no need to config exclude table-cfs"); + } catch (IOException e) { + // OK + rpc.setExcludeTableCFsMap(null); + } + + rpc.setNamespaces(namespaces); + rpc.setTableCFsMap(tableCfs); + // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + // Default replicate_all flag is true + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + rpc2.setExcludeNamespaces(namespaces); + rpc2.setExcludeTableCFsMap(tableCfs); + // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude + // table-cfs config + hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + hbaseAdmin.removeReplicationPeer(ID_SECOND); } @Test @@ -539,41 +657,80 @@ public class TestReplicationAdmin { rpc.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - rpc = admin.getPeerConfig(ID_ONE); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); Set namespaces = new HashSet(); namespaces.add(ns1); rpc.setNamespaces(namespaces); - admin.updatePeerConfig(ID_ONE, rpc); - rpc = admin.getPeerConfig(ID_ONE); - Map> tableCfs = new HashMap<>(); - tableCfs.put(tableName1, new ArrayList<>()); - rpc.setTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); try { - admin.updatePeerConfig(ID_ONE, rpc); - fail("Should throw ReplicationException, because table " + tableName1 + " conflict with namespace " - + ns1); - } catch (IOException e) { + Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName1, new ArrayList<>()); + rpc.setTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + fail("Should throw ReplicationException" + " Because table " + tableName1 + + " conflict with namespace " + ns1); + } catch (Exception e) { // OK } - rpc = admin.getPeerConfig(ID_ONE); - tableCfs.clear(); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + Map> tableCfs = new HashMap<>(); tableCfs.put(tableName2, new ArrayList<>()); rpc.setTableCFsMap(tableCfs); - admin.updatePeerConfig(ID_ONE, rpc); - rpc = admin.getPeerConfig(ID_ONE); - namespaces.clear(); - namespaces.add(ns2); - rpc.setNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); try { - admin.updatePeerConfig(ID_ONE, rpc); - fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table " - + tableName2); - } catch (IOException e) { + namespaces.clear(); + namespaces.add(ns2); + rpc.setNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + fail("Should throw ReplicationException" + " Because namespace " + ns2 + + " conflict with table " + tableName2); + } catch (Exception e) { // OK } - admin.removePeer(ID_ONE); + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); + + rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + Set excludeNamespaces = new HashSet(); + excludeNamespaces.add(ns1); + rpc2.setExcludeNamespaces(excludeNamespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); + rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + try { + Map> excludeTableCfs = new HashMap<>(); + excludeTableCfs.put(tableName1, new ArrayList<>()); + rpc2.setExcludeTableCFsMap(excludeTableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); + fail("Should throw ReplicationException" + " Because exclude table " + tableName1 + + " conflict with exclude namespace " + ns1); + } catch (Exception e) { + // OK + } + + rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + Map> excludeTableCfs = new HashMap<>(); + excludeTableCfs.put(tableName2, new ArrayList<>()); + rpc2.setExcludeTableCFsMap(excludeTableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); + rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + try { + namespaces.clear(); + namespaces.add(ns2); + rpc2.setNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); + fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 + + " conflict with exclude table " + tableName2); + } catch (Exception e) { + // OK + } + + hbaseAdmin.removeReplicationPeer(ID_ONE); + hbaseAdmin.removeReplicationPeer(ID_SECOND); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 608d22b01f4..7c3773aa710 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -22,8 +22,7 @@ import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,6 +46,8 @@ import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + @Category({ReplicationTests.class, SmallTests.class}) public class TestReplicationWALEntryFilters { @@ -205,23 +206,17 @@ public class TestReplicationWALEntryFilters { ReplicationPeer peer = mock(ReplicationPeer.class); ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); - // 1. replicate all user tables - when(peerConfig.replicateAllUserTables()).thenReturn(true); - when(peer.getPeerConfig()).thenReturn(peerConfig); - Entry userEntry = createEntry(null, a, b, c); - ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); - assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); - - // 2. not replicate all user tables, no namespaces and table-cfs config + // 1. replicate_all flag is false, no namespaces and table-cfs config when(peerConfig.replicateAllUserTables()).thenReturn(false); when(peerConfig.getNamespaces()).thenReturn(null); when(peerConfig.getTableCFsMap()).thenReturn(null); when(peer.getPeerConfig()).thenReturn(peerConfig); - userEntry = createEntry(null, a, b, c); - filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + Entry userEntry = createEntry(null, a, b, c); + ChainWALEntryFilter filter = + new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); - // 3. Only config table-cfs in peer + // 2. replicate_all flag is false, and only config table-cfs in peer // empty map userEntry = createEntry(null, a, b, c); Map> tableCfs = new HashMap<>(); @@ -261,7 +256,7 @@ public class TestReplicationWALEntryFilters { filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); - // 3. Only config namespaces in peer + // 3. replicate_all flag is false, and only config namespaces in peer when(peer.getTableCFs()).thenReturn(null); // empty set Set namespaces = new HashSet<>(); @@ -292,7 +287,7 @@ public class TestReplicationWALEntryFilters { filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); - // 4. Config namespaces and table-cfs both + // 4. replicate_all flag is false, and config namespaces and table-cfs both // Namespaces config should not confict with table-cfs config namespaces = new HashSet<>(); tableCfs = new HashMap<>(); @@ -331,9 +326,110 @@ public class TestReplicationWALEntryFilters { assertEquals(null, filter.filter(userEntry)); } + @Test + public void testNamespaceTableCfWALEntryFilter2() { + ReplicationPeer peer = mock(ReplicationPeer.class); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); + + // 1. replicate_all flag is true + // and no exclude namespaces and no exclude table-cfs config + when(peerConfig.replicateAllUserTables()).thenReturn(true); + when(peerConfig.getExcludeNamespaces()).thenReturn(null); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); + Entry userEntry = createEntry(null, a, b, c); + ChainWALEntryFilter filter = + new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + // 2. replicate_all flag is true, and only config exclude namespaces + // empty set + Set namespaces = new HashSet(); + when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + // exclude namespace default + namespaces.add("default"); + when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); + + // exclude namespace ns1 + namespaces = new HashSet(); + namespaces.add("ns1"); + when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + // 3. replicate_all flag is true, and only config exclude table-cfs + // empty table-cfs map + Map> tableCfs = new HashMap>(); + when(peerConfig.getExcludeNamespaces()).thenReturn(null); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + // exclude table bar + tableCfs = new HashMap>(); + tableCfs.put(TableName.valueOf("bar"), null); + when(peerConfig.getExcludeNamespaces()).thenReturn(null); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); + + // exclude table foo:a + tableCfs = new HashMap>(); + tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); + when(peerConfig.getExcludeNamespaces()).thenReturn(null); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, b, c), filter.filter(userEntry)); + + // 4. replicate_all flag is true, and config exclude namespaces and table-cfs both + // exclude ns1 and table foo:a,c + namespaces = new HashSet(); + tableCfs = new HashMap>(); + namespaces.add("ns1"); + tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); + when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, b), filter.filter(userEntry)); + + // exclude namespace default and table ns1:bar + namespaces = new HashSet(); + tableCfs = new HashMap>(); + namespaces.add("default"); + tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList()); + when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces); + when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); + } + private Entry createEntry(TreeMap scopes, byte[]... kvs) { - WALKeyImpl key1 = - new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes); + WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + System.currentTimeMillis(), scopes); WALEdit edit1 = new WALEdit(); for (byte[] kv : kvs) { @@ -342,7 +438,6 @@ public class TestReplicationWALEntryFilters { return new Entry(key1, edit1); } - private void assertEquals(Entry e1, Entry e2) { Assert.assertEquals(e1 == null, e2 == null); if (e1 == null) { diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 50c086afe30..949bf6820ba 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -152,7 +152,11 @@ module Hbase # Show the current tableCFs config for the specified peer def show_peer_tableCFs(id) rpc = @admin.getReplicationPeerConfig(id) - ReplicationPeerConfigUtil.convertToString(rpc.getTableCFsMap) + show_peer_tableCFs_by_config(rpc) + end + + def show_peer_tableCFs_by_config(peer_config) + ReplicationPeerConfigUtil.convertToString(peer_config.getTableCFsMap) end #---------------------------------------------------------------------------------------------- @@ -274,6 +278,49 @@ module Hbase @replication_admin.updatePeerConfig(id, rpc) end + # Set exclude namespaces config for the specified peer + def set_peer_exclude_namespaces(id, exclude_namespaces) + return if exclude_namespaces.nil? + exclude_ns_set = java.util.HashSet.new + exclude_namespaces.each do |n| + exclude_ns_set.add(n) + end + rpc = get_peer_config(id) + return if rpc.nil? + rpc.setExcludeNamespaces(exclude_ns_set) + @admin.updateReplicationPeerConfig(id, rpc) + end + + # Show the exclude namespaces config for the specified peer + def show_peer_exclude_namespaces(peer_config) + namespaces = peer_config.getExcludeNamespaces + return nil if namespaces.nil? + namespaces = java.util.ArrayList.new(namespaces) + java.util.Collections.sort(namespaces) + '!' + namespaces.join(';') + end + + # Set exclude tableCFs config for the specified peer + def set_peer_exclude_tableCFs(id, exclude_tableCFs) + return if exclude_tableCFs.nil? + # convert tableCFs to TableName + map = java.util.HashMap.new + exclude_tableCFs.each do |key, val| + map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) + end + rpc = get_peer_config(id) + return if rpc.nil? + rpc.setExcludeTableCFsMap(map) + @admin.updateReplicationPeerConfig(id, rpc) + end + + # Show the exclude tableCFs config for the specified peer + def show_peer_exclude_tableCFs(peer_config) + tableCFs = peer_config.getExcludeTableCFsMap + return nil if tableCFs.nil? + '!' + ReplicationPeerConfigUtil.convertToString(tableCFs) + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 58886fc9ca5..a01a89052b2 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -381,8 +381,10 @@ Shell.load_command_group( set_peer_namespaces append_peer_namespaces remove_peer_namespaces + set_peer_exclude_namespaces show_peer_tableCFs set_peer_tableCFs + set_peer_exclude_tableCFs set_peer_bandwidth list_replicated_tables append_peer_tableCFs diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 6812df4d5d2..522d23d2a4c 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -23,7 +23,13 @@ module Shell class ListPeers < Command def help <<-EOF -List all replication peer clusters. + List all replication peer clusters. + + If replicate_all flag is false, the namespaces and table-cfs in peer config + will be replicated to peer cluster. + + If replicate_all flag is true, all user tables will be replicate to peer + cluster, except that the namespaces and table-cfs in peer config. hbase> list_peers EOF @@ -39,8 +45,13 @@ EOF id = peer.getPeerId state = peer.isEnabled ? 'ENABLED' : 'DISABLED' config = peer.getPeerConfig - namespaces = replication_admin.show_peer_namespaces(config) - tableCFs = replication_admin.show_peer_tableCFs(id) + if config.replicateAllUserTables + namespaces = replication_admin.show_peer_exclude_namespaces(config) + tableCFs = replication_admin.show_peer_exclude_tableCFs(config) + else + namespaces = replication_admin.show_peer_namespaces(config) + tableCFs = replication_admin.show_peer_tableCFs_by_config(config) + end formatter.row([id, config.getClusterKey, config.getReplicationEndpointImpl, state, config.replicateAllUserTables, namespaces, tableCFs, diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb new file mode 100644 index 00000000000..bf9b90b45bd --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_namespaces.rb @@ -0,0 +1,52 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerExcludeNamespaces < Command + def help + <<-EOF + Set the namespaces which not replicated for the specified peer. + + Note: + 1. The replicate_all flag need to be true when set exclude namespaces. + 2. Set a exclude namespace in the peer config means that all tables in this + namespace will not be replicated to the peer cluster. If peer config + already has a exclude table, then not allow set this table's namespace + as a exclude namespace. + + Examples: + + # set exclude namespaces config to null + hbase> set_peer_exclude_namespaces '1', [] + # set namespaces which not replicated for a peer. + # set a exclude namespace in the peer config means that all tables in this + # namespace will not be replicated. + hbase> set_peer_exclude_namespaces '2', ["ns1", "ns2"] + + EOF + end + + def command(id, exclude_namespaces) + replication_admin.set_peer_exclude_namespaces(id, exclude_namespaces) + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb new file mode 100644 index 00000000000..25be364237b --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_exclude_tableCFs.rb @@ -0,0 +1,51 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerExcludeTableCFs < Command + def help + <<-EOF + Set the table-cfs which not replicated for the specified peer. + + Note: + 1. The replicate_all flag need to be true when set exclude table-cfs. + 2. If peer config already has a exclude namespace, then not allow set any + exclude table of this namespace to the peer config. + + Examples: + + # set exclude table-cfs to null + hbase> set_peer_exclude_tableCFs '1' + # set table / table-cf which not replicated for a peer, for a table without + # an explicit column-family list, all column-families will not be replicated + hbase> set_peer_exclude_tableCFs '2', { "ns1:table1" => [], + "ns2:table2" => ["cf1", "cf2"], + "ns3:table3" => ["cfA", "cfB"]} + + EOF + end + + def command(id, exclude_peer_table_cfs = nil) + replication_admin.set_peer_exclude_tableCFs(id, exclude_peer_table_cfs) + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb index 6d14c1ce18d..9f0649dabd4 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb @@ -25,10 +25,10 @@ module Shell <<-EOF Set the replicable namespaces config for the specified peer. - Set a namespace in the peer config means that all tables in this - namespace will be replicated to the peer cluster. So if you already - have set a namespace in the peer config, then you can't set this - namespace's tables in the peer config again. + 1. The replicate_all flag need to be false when set the replicable namespaces. + 2. Set a namespace in the peer config means that all tables in this namespace + will be replicated to the peer cluster. If peer config already has a table, + then not allow set this table's namespace to the peer config. Examples: diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb index f6de6150952..89969647e5f 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb @@ -26,7 +26,10 @@ module Shell Set the replicate_all flag to true or false for the specified peer. If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0) - will be replicate to peer cluster. + will be replicate to peer cluster. But you can use 'set_peer_exclude_namespaces' + to set which namespaces can't be replicated to peer cluster. And you can use + 'set_peer_exclude_tableCFs' to set which tables can't be replicated to peer + cluster. If replicate_all flag is false, then all user tables cannot be replicate to peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces' @@ -36,6 +39,9 @@ module Shell Notice: When you want to change a peer's replicate_all flag from false to true, you need clean the peer's NAMESPACES and TABLECFS config firstly. + When you want to change a peer's replicate_all flag from true to false, + you need clean the peer's EXCLUDE_NAMESPACES and EXCLUDE_TABLECFS + config firstly. Examples: diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb index 6da2f11bafa..03b21862a97 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb @@ -25,8 +25,10 @@ module Shell <<-EOF Set the replicable table-cf config for the specified peer. - Can't set a table to table-cfs config if it's namespace already was in - namespaces config of this peer. + Note: + 1. The replicate_all flag need to be false when set the replicable table-cfs. + 2. Can't set a table to table-cfs config if it's namespace already was in + namespaces config of this peer. Examples: diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 4b74adaadea..0f84396330a 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -294,6 +294,29 @@ module Hbase command(:remove_peer, @peer_id) end + define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do + cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' + args = { CLUSTER_KEY => cluster_key } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + + table_cfs = { 'table1' => [], 'table2' => ['cf1'], + 'ns3:table3' => ['cf1', 'cf2'] } + command(:set_peer_exclude_tableCFs, @peer_id, table_cfs) + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + peer_config = peer.getPeerConfig + assert_equal(true, peer_config.replicateAllUserTables) + assert_tablecfs_equal(table_cfs, peer_config.getExcludeTableCFsMap) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + define_test "set_peer_namespaces: works with namespaces array" do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" namespaces = ["ns1", "ns2"] @@ -395,6 +418,25 @@ module Hbase command(:remove_peer, @peer_id) end + define_test 'set_peer_exclude_namespaces: works with namespaces array' do + cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' + namespaces = ['ns1', 'ns2'] + namespaces_str = '!ns1;ns2' + + args = { CLUSTER_KEY => cluster_key } + command(:add_peer, @peer_id, args) + command(:set_peer_exclude_namespaces, @peer_id, namespaces) + + assert_equal(1, command(:list_peers).length) + peer_config = command(:list_peers).get(0).getPeerConfig + assert_equal(true, peer_config.replicateAllUserTables) + assert_equal(namespaces_str, + replication_admin.show_peer_exclude_namespaces(peer_config)) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test 'set_peer_replicate_all' do cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'