diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index be468ae1007..52a3c933eed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -286,6 +286,7 @@ public final class ReplicationPeerConfigUtil { if (tableCFsMap != null) { peerConfig.setTableCFsMap(tableCFsMap); } + List namespacesList = peer.getNamespacesList(); if (namespacesList != null && namespacesList.size() != 0) { Set namespaces = new HashSet<>(); @@ -294,9 +295,15 @@ public final class ReplicationPeerConfigUtil { } peerConfig.setNamespaces(namespaces); } + if (peer.hasBandwidth()) { peerConfig.setBandwidth(peer.getBandwidth()); } + + if (peer.hasReplicateAll()) { + peerConfig.setReplicateAllUserTables(peer.getReplicateAll()); + } + return peerConfig; } @@ -338,6 +345,7 @@ public final class ReplicationPeerConfigUtil { } builder.setBandwidth(peerConfig.getBandwidth()); + builder.setReplicateAll(peerConfig.replicateAllUserTables()); return builder.build(); } @@ -465,4 +473,4 @@ public final class ReplicationPeerConfigUtil { return otherConf; } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 4d429c9896e..9e20829ae43 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -42,6 +42,8 @@ public class ReplicationPeerConfig { private Map> tableCFsMap = null; private Set namespaces = null; private long bandwidth = 0; + // Default value is true, means replicate all user tables to peer cluster. + private boolean replicateAllUserTables = true; public ReplicationPeerConfig() { this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -110,10 +112,20 @@ public class ReplicationPeerConfig { return this; } + public boolean replicateAllUserTables() { + return this.replicateAllUserTables; + } + + public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) { + this.replicateAllUserTables = replicateAllUserTables; + return this; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(","); + builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(","); if (namespaces != null) { builder.append("namespaces=").append(namespaces.toString()).append(","); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 88efa00df3b..a1a7ade8ec1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -45,6 +45,7 @@ message ReplicationPeer { repeated TableCF table_cfs = 5; repeated bytes namespaces = 6; optional int64 bandwidth = 7; + optional bool replicate_all = 8; } /** diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index b7564f4f379..2c3bbd5bbb7 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -368,6 +368,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); existingConfig.setNamespaces(newConfig.getNamespaces()); existingConfig.setBandwidth(newConfig.getBandwidth()); + existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables()); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 97982b98d8f..fcaf55fc783 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -156,7 +156,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; -import org.apache.hadoop.hbase.replication.master.TableCFsUpdater; +import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; @@ -798,9 +798,9 @@ public class HMaster extends HRegionServer implements MasterServices { // This is for backwards compatibility // See HBASE-11393 status.setStatus("Update TableCFs node in ZNode"); - TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper, + ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper, conf, this.clusterConnection); - tableCFsUpdater.update(); + tableCFsUpdater.copyTableCFs(); // Add the Observer to delete space quotas on table deletion before starting all CPs by // default with quota support, avoiding if user specifically asks to not load this Observer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index 361599290fa..f2a6c852512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -71,9 +71,7 @@ public class ReplicationManager { public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { - checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), - peerConfig.getTableCFsMap()); - checkConfiguredWALEntryFilters(peerConfig); + checkPeerConfig(peerConfig); replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.peerConnected(peerId); } @@ -102,9 +100,7 @@ public class ReplicationManager { public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { - checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), - peerConfig.getTableCFsMap()); - checkConfiguredWALEntryFilters(peerConfig); + checkPeerConfig(peerConfig); this.replicationPeers.updatePeerConfig(peerId, peerConfig); } @@ -122,6 +118,21 @@ public class ReplicationManager { return peers; } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException, + IOException { + if (peerConfig.replicateAllUserTables()) { + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new ReplicationException( + "Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster"); + } + } else { + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); + } + checkConfiguredWALEntryFilters(peerConfig); + } + /** * Set a namespace in the peer config means that all tables in this namespace * will be replicated to the peer cluster. @@ -150,8 +161,6 @@ public class ReplicationManager { "Table-cfs config conflict with namespaces config in peer"); } } - - } private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index 5591974da91..9a4cc6c3795 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -58,69 +58,74 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi public Entry filter(Entry entry) { TableName tabName = entry.getKey().getTablename(); String namespace = tabName.getNamespaceAsString(); - Set namespaces = this.peer.getNamespaces(); - Map> tableCFs = getTableCfs(); + ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); + + if (peerConfig.replicateAllUserTables()) { + // replicate all user tables, so return entry directly + return entry; + } else { + // Not replicate all user tables, so filter by namespaces and table-cfs config + Set namespaces = peerConfig.getNamespaces(); + Map> tableCFs = peerConfig.getTableCFsMap(); + + if (namespaces == null && tableCFs == null) { + return null; + } + + // First filter by namespaces config + // If table's namespace in peer config, all the tables data are applicable for replication + if (namespaces != null && namespaces.contains(namespace)) { + return entry; + } + + // Then filter by table-cfs config + // return null(prevent replicating) if logKey's table isn't in this peer's + // replicaable namespace list and table list + if (tableCFs == null || !tableCFs.containsKey(tabName)) { + return null; + } - // If null means user has explicitly not configured any namespaces and table CFs - // so all the tables data are applicable for replication - if (namespaces == null && tableCFs == null) { return entry; } - - // First filter by namespaces config - // If table's namespace in peer config, all the tables data are applicable for replication - if (namespaces != null && namespaces.contains(namespace)) { - return entry; - } - - // Then filter by table-cfs config - // return null(prevent replicating) if logKey's table isn't in this peer's - // replicaable namespace list and table list - if (tableCFs == null || !tableCFs.containsKey(tabName)) { - return null; - } - - return entry; } @Override public Cell filterCell(final Entry entry, Cell cell) { - final Map> tableCfs = getTableCfs(); - if (tableCfs == null) return cell; - TableName tabName = entry.getKey().getTablename(); - List cfs = tableCfs.get(tabName); - // ignore(remove) kv if its cf isn't in the replicable cf list - // (empty cfs means all cfs of this table are replicable) - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, new Predicate() { - @Override - public boolean apply(byte[] fam) { - if (tableCfs != null) { - List cfs = tableCfs.get(entry.getKey().getTablename()); - if (cfs != null && !cfs.contains(Bytes.toString(fam))) { - return true; - } - } - return false; - } - }); + ReplicationPeerConfig peerConfig = this.peer.getPeerConfig(); + if (peerConfig.replicateAllUserTables()) { + // replicate all user tables, so return cell directly + return cell; } else { - if ((cfs != null) && !cfs.contains( - Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { - return null; + final Map> tableCfs = peerConfig.getTableCFsMap(); + if (tableCfs == null) { + return cell; + } + TableName tabName = entry.getKey().getTablename(); + List cfs = tableCfs.get(tabName); + // ignore(remove) kv if its cf isn't in the replicable cf list + // (empty cfs means all cfs of this table are replicable) + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate() { + @Override + public boolean apply(byte[] fam) { + if (tableCfs != null) { + List cfs = tableCfs.get(entry.getKey().getTablename()); + if (cfs != null && !cfs.contains(Bytes.toString(fam))) { + return true; + } + } + return false; + } + }); + } else { + if ((cfs != null) + && !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength()))) { + return null; + } } - } - return cell; - } - Map> getTableCfs() { - Map> tableCFs = null; - try { - tableCFs = this.peer.getTableCFs(); - } catch (IllegalArgumentException e) { - LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() + - ", degenerate as if it's not configured by keeping tableCFs==null"); + return cell; } - return tableCFs; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java similarity index 68% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java index f44249521ff..5c8fba39709 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java @@ -18,14 +18,19 @@ */ package org.apache.hadoop.hbase.replication.master; +import java.io.IOException; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -34,8 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.zookeeper.KeeperException; -import java.io.IOException; -import java.util.List; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; /** * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. @@ -43,16 +47,36 @@ import java.util.List; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class TableCFsUpdater extends ReplicationStateZKBase { +public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { - private static final Log LOG = LogFactory.getLog(TableCFsUpdater.class); + private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUpgrader.class); - public TableCFsUpdater(ZKWatcher zookeeper, + public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf, Abortable abortable) { super(zookeeper, conf, abortable); } - public void update() { + public void upgrade() throws Exception { + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Admin admin = conn.getAdmin(); + admin.listReplicationPeers().forEach( + (peerDesc) -> { + String peerId = peerDesc.getPeerId(); + ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + peerConfig.setReplicateAllUserTables(false); + try { + admin.updateReplicationPeerConfig(peerId, peerConfig); + } catch (Exception e) { + LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); + } + } + }); + } + } + + public void copyTableCFs() { List znodes = null; try { znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); @@ -61,14 +85,14 @@ public class TableCFsUpdater extends ReplicationStateZKBase { } if (znodes != null) { for (String peerId : znodes) { - if (!update(peerId)) { + if (!copyTableCFs(peerId)) { LOG.error("upgrade tableCFs failed for peerId=" + peerId); } } } } - public boolean update(String peerId) { + public boolean copyTableCFs(String peerId) { String tableCFsNode = getTableCFsNode(peerId); try { if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { @@ -121,10 +145,13 @@ public class TableCFsUpdater extends ReplicationStateZKBase { } private static void printUsageAndExit() { - System.err.printf("Usage: hbase org.apache.hadoop.hbase.replication.master.TableCFsUpdater [options]"); + System.err.printf( + "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" + + " [options]"); System.err.println(" where [options] are:"); - System.err.println(" -h|-help Show this help and exit."); - System.err.println(" update Copy table-cfs to replication peer config"); + System.err.println(" -h|-help Show this help and exit."); + System.err.println(" copyTableCFs Copy table-cfs to replication peer config"); + System.err.println(" upgrade Upgrade replication peer config to new format"); System.err.println(); System.exit(1); } @@ -135,15 +162,21 @@ public class TableCFsUpdater extends ReplicationStateZKBase { } if (args[0].equals("-help") || args[0].equals("-h")) { printUsageAndExit(); - } else if (args[0].equals("update")) { + } else if (args[0].equals("copyTableCFs")) { Configuration conf = HBaseConfiguration.create(); - ZKWatcher zkw = new ZKWatcher(conf, "TableCFsUpdater", null); + ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); try { - TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zkw, conf, null); - tableCFsUpdater.update(); + ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw, + conf, null); + tableCFsUpdater.copyTableCFs(); } finally { zkw.close(); } + } else if (args[0].equals("upgrade")) { + Configuration conf = HBaseConfiguration.create(); + ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); + ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null); + upgrader.upgrade(); } else { printUsageAndExit(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index e48907849d7..6591826f5d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -149,6 +149,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { // Add a valid peer admin.addReplicationPeer(ID_ONE, rpc1).join(); + rpc1.setReplicateAllUserTables(false); + admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); Map> tableCFs = new HashMap<>(); @@ -248,6 +250,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); // Add a valid peer admin.addReplicationPeer(ID_ONE, rpc1).join(); + rpc1.setReplicateAllUserTables(false); + admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); + Map> tableCFs = new HashMap<>(); try { tableCFs.put(tableName3, null); @@ -328,6 +333,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(KEY_ONE); admin.addReplicationPeer(ID_ONE, rpc).join(); + rpc.setReplicateAllUserTables(false); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); // add ns1 and ns2 to peer config rpc = admin.getReplicationPeerConfig(ID_ONE).get(); @@ -364,6 +371,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(KEY_ONE); admin.addReplicationPeer(ID_ONE, rpc).join(); + rpc.setReplicateAllUserTables(false); + admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); rpc = admin.getReplicationPeerConfig(ID_ONE).get(); Set namespaces = new HashSet(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java index 4b88bf7ca8b..9ceb1725463 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java @@ -218,6 +218,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase Map> tableCfs = new HashMap<>(); tableCfs.put(tableName, null); ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get(); + rpc.setReplicateAllUserTables(false); rpc.setTableCFsMap(tableCfs); try { // Only add tableName to replication peer config @@ -236,6 +237,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase admin2.tableExists(tableName2).get()); } finally { rpc.setTableCFsMap(null); + rpc.setReplicateAllUserTables(true); admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 036706a9441..19f117b2ac2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -85,10 +85,9 @@ public class TestReplicationAdmin { */ @BeforeClass public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); TEST_UTIL.startMiniCluster(); - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - admin = new ReplicationAdmin(conf); + admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); hbaseAdmin = TEST_UTIL.getAdmin(); } @@ -238,8 +237,8 @@ public class TestReplicationAdmin { @Test public void testAppendPeerTableCFs() throws Exception { - ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); - rpc1.setClusterKey(KEY_ONE); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); @@ -248,10 +247,14 @@ public class TestReplicationAdmin { final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); // Add a valid peer - admin.addPeer(ID_ONE, rpc1, null); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + // Update peer config, not replicate all user tables + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + rpc.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); Map> tableCFs = new HashMap<>(); - tableCFs.put(tableName1, null); admin.appendPeerTableCFs(ID_ONE, tableCFs); Map> result = @@ -338,14 +341,21 @@ public class TestReplicationAdmin { @Test public void testRemovePeerTableCFs() throws Exception { - ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); - rpc1.setClusterKey(KEY_ONE); + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); + // Add a valid peer - admin.addPeer(ID_ONE, rpc1, null); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + // Update peer config, not replicate all user tables + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + rpc.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + Map> tableCFs = new HashMap<>(); try { tableCFs.put(tableName3, null); @@ -423,27 +433,98 @@ public class TestReplicationAdmin { rpc.setClusterKey(KEY_ONE); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); - rpc = admin.getPeerConfig(ID_ONE); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + rpc.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); Set namespaces = new HashSet<>(); namespaces.add(ns1); namespaces.add(ns2); rpc.setNamespaces(namespaces); - admin.updatePeerConfig(ID_ONE, rpc); - namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); assertEquals(2, namespaces.size()); assertTrue(namespaces.contains(ns1)); assertTrue(namespaces.contains(ns2)); - rpc = admin.getPeerConfig(ID_ONE); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); namespaces.clear(); namespaces.add(ns1); rpc.setNamespaces(namespaces); - admin.updatePeerConfig(ID_ONE, rpc); - namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); assertEquals(1, namespaces.size()); assertTrue(namespaces.contains(ns1)); - admin.removePeer(ID_ONE); + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testSetReplicateAllUserTables() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertTrue(rpc.replicateAllUserTables()); + + rpc.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertFalse(rpc.replicateAllUserTables()); + + rpc.setReplicateAllUserTables(true); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); + rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertTrue(rpc.replicateAllUserTables()); + + hbaseAdmin.removeReplicationPeer(ID_ONE); + } + + @Test + public void testPeerConfigConflict() throws Exception { + // Default replicate all flag is true + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(KEY_ONE); + + String ns1 = "ns1"; + Set namespaces = new HashSet(); + namespaces.add(ns1); + + TableName tab1 = TableName.valueOf("ns1:tabl"); + Map> tableCfs = new HashMap>(); + tableCfs.put(tab1, new ArrayList()); + + try { + rpc.setNamespaces(namespaces); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + fail("Should throw Exception. When replicate all flag is true, no need to config namespaces"); + } catch (IOException e) { + // OK + rpc.setNamespaces(null); + } + + try { + rpc.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs"); + } catch (IOException e) { + // OK + rpc.setTableCFsMap(null); + } + + try { + rpc.setNamespaces(namespaces); + rpc.setTableCFsMap(tableCfs); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc); + fail("Should throw Exception." + + " When replicate all flag is true, no need to config namespaces or table-cfs"); + } catch (IOException e) { + // OK + rpc.setNamespaces(null); + rpc.setTableCFsMap(null); + } } @Test @@ -455,6 +536,7 @@ public class TestReplicationAdmin { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(KEY_ONE); + rpc.setReplicateAllUserTables(false); hbaseAdmin.addReplicationPeer(ID_ONE, rpc); rpc = admin.getPeerConfig(ID_ONE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 26103138f0f..3b7fd84e3dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -194,7 +194,13 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.disableTable(TestReplicationBase.tableName); admin2.deleteTable(TestReplicationBase.tableName); } - assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(TestReplicationBase.tableName)); + assertFalse("Table should not exists in the peer cluster", + admin2.isTableAvailable(TestReplicationBase.tableName)); + + // update peer config + ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId); + rpc.setReplicateAllUserTables(false); + admin1.updateReplicationPeerConfig(peerId, rpc); Map> tableCfs = new HashMap<>(); tableCfs.put(tableName, null); @@ -214,6 +220,10 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { } finally { adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); admin1.disableTableReplication(TestReplicationBase.tableName); + + rpc = admin1.getReplicationPeerConfig(peerId); + rpc.setReplicateAllUserTables(true); + admin1.updateReplicationPeerConfig(peerId, rpc); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 58b22c800fc..ac535515802 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -524,10 +524,12 @@ public class TestMasterReplication { private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) throws Exception { - try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) - .getAdmin()) { - admin.addReplicationPeer(id, + try (Admin admin = + ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) { + admin.addReplicationPeer( + id, new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) + .setReplicateAllUserTables(false) .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java index 433a3450c34..0d7a92d9774 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java @@ -140,8 +140,12 @@ public class TestNamespaceReplication extends TestReplicationBase { Table htab1B = connection1.getTable(tabBName); Table htab2B = connection2.getTable(tabBName); - // add ns1 to peer config which replicate to cluster2 ReplicationPeerConfig rpc = admin.getPeerConfig("2"); + rpc.setReplicateAllUserTables(false); + admin.updatePeerConfig("2", rpc); + + // add ns1 to peer config which replicate to cluster2 + rpc = admin.getPeerConfig("2"); Set namespaces = new HashSet<>(); namespaces.add(ns1); rpc.setNamespaces(namespaces); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index 84ce9a3b99c..e9c352d0585 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -404,6 +404,7 @@ public class TestPerTableCFReplication { // A. add cluster2/cluster3 as peers to cluster1 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); rpc2.setClusterKey(utility2.getClusterKey()); + rpc2.setReplicateAllUserTables(false); Map> tableCFs = new HashMap<>(); tableCFs.put(tabCName, null); tableCFs.put(tabBName, new ArrayList<>()); @@ -413,6 +414,7 @@ public class TestPerTableCFReplication { ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); rpc3.setClusterKey(utility3.getClusterKey()); + rpc3.setReplicateAllUserTables(false); tableCFs.clear(); tableCFs.put(tabAName, null); tableCFs.put(tabBName, new ArrayList<>()); @@ -518,7 +520,7 @@ public class TestPerTableCFReplication { connection2.close(); connection3.close(); } - } + } private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { Get get = new Get(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index be65576f240..9fda8bc2c2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -202,19 +202,31 @@ public class TestReplicationWALEntryFilters { @Test public void testNamespaceTableCfWALEntryFilter() { ReplicationPeer peer = mock(ReplicationPeer.class); + ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); - // 1. no namespaces config and table-cfs config in peer - when(peer.getNamespaces()).thenReturn(null); - when(peer.getTableCFs()).thenReturn(null); + // 1. replicate all user tables + when(peerConfig.replicateAllUserTables()).thenReturn(true); + when(peer.getPeerConfig()).thenReturn(peerConfig); Entry userEntry = createEntry(null, a, b, c); - WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); - assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); + ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); - // 2. Only config table-cfs in peer + // 2. not replicate all user tables, no namespaces and table-cfs config + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(null); + when(peerConfig.getTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); + userEntry = createEntry(null, a, b, c); + filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); + assertEquals(null, filter.filter(userEntry)); + + // 3. Only config table-cfs in peer // empty map userEntry = createEntry(null, a, b, c); Map> tableCfs = new HashMap<>(); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -222,7 +234,9 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("bar"), null); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -230,7 +244,9 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a), filter.filter(userEntry)); @@ -238,7 +254,9 @@ public class TestReplicationWALEntryFilters { userEntry = createEntry(null, a, b, c, d); tableCfs = new HashMap<>(); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,c), filter.filter(userEntry)); @@ -246,14 +264,19 @@ public class TestReplicationWALEntryFilters { when(peer.getTableCFs()).thenReturn(null); // empty set Set namespaces = new HashSet<>(); - when(peer.getNamespaces()).thenReturn(namespaces); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peerConfig.getTableCFsMap()).thenReturn(null); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); // namespace default namespaces.add("default"); - when(peer.getNamespaces()).thenReturn(namespaces); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); @@ -261,7 +284,9 @@ public class TestReplicationWALEntryFilters { // namespace ns1 namespaces = new HashSet<>(); namespaces.add("ns1"); - when(peer.getNamespaces()).thenReturn(namespaces); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); @@ -271,9 +296,11 @@ public class TestReplicationWALEntryFilters { namespaces = new HashSet<>(); tableCfs = new HashMap<>(); namespaces.add("ns1"); - when(peer.getNamespaces()).thenReturn(namespaces); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, c), filter.filter(userEntry)); @@ -281,9 +308,11 @@ public class TestReplicationWALEntryFilters { namespaces = new HashSet<>(); tableCfs = new HashMap<>(); namespaces.add("default"); - when(peer.getNamespaces()).thenReturn(namespaces); tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); @@ -291,9 +320,11 @@ public class TestReplicationWALEntryFilters { namespaces = new HashSet<>(); tableCfs = new HashMap<>(); namespaces.add("ns1"); - when(peer.getNamespaces()).thenReturn(namespaces); tableCfs.put(TableName.valueOf("bar"), null); - when(peer.getTableCFs()).thenReturn(tableCfs); + when(peerConfig.replicateAllUserTables()).thenReturn(false); + when(peerConfig.getNamespaces()).thenReturn(namespaces); + when(peerConfig.getTableCFsMap()).thenReturn(tableCfs); + when(peer.getPeerConfig()).thenReturn(peerConfig); userEntry = createEntry(null, a, b, c); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); assertEquals(null, filter.filter(userEntry)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java index cb895caa193..e78abfb74b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java @@ -50,7 +50,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @Category({ReplicationTests.class, SmallTests.class}) -public class TestTableCFsUpdater extends TableCFsUpdater { +public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -164,7 +164,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater { assertNull(actualRpc.getTableCFsMap()); assertNull(actualTableCfs); - update(); + copyTableCFs(); peerId = "1"; peerNode = getPeerNode(peerId); diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 3f6435647ce..50c086afe30 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -92,6 +92,7 @@ module Hbase namespaces.each do |n| ns_set.add(n) end + replication_peer_config.setReplicateAllUserTables(false) replication_peer_config.set_namespaces(ns_set) end @@ -101,6 +102,7 @@ module Hbase table_cfs.each do |key, val| map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) end + replication_peer_config.setReplicateAllUserTables(false) replication_peer_config.set_table_cfs_map(map) end @@ -265,6 +267,13 @@ module Hbase end end + def set_peer_replicate_all(id, replicate_all) + rpc = @replication_admin.getPeerConfig(id) + return if rpc.nil? + rpc.setReplicateAllUserTables(replicate_all) + @replication_admin.updatePeerConfig(id, rpc) + end + #---------------------------------------------------------------------------------------------- # Enables a table's replication switch def enable_tablerep(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 687af12044f..60ca229b93e 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -377,6 +377,7 @@ Shell.load_command_group( list_peers enable_peer disable_peer + set_peer_replicate_all set_peer_namespaces append_peer_namespaces remove_peer_namespaces diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 04453c24ed9..6812df4d5d2 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -33,7 +33,7 @@ EOF peers = replication_admin.list_peers formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME - STATE NAMESPACES TABLE_CFS BANDWIDTH]) + STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH]) peers.each do |peer| id = peer.getPeerId @@ -42,7 +42,8 @@ EOF namespaces = replication_admin.show_peer_namespaces(config) tableCFs = replication_admin.show_peer_tableCFs(id) formatter.row([id, config.getClusterKey, - config.getReplicationEndpointImpl, state, namespaces, tableCFs, + config.getReplicationEndpointImpl, state, + config.replicateAllUserTables, namespaces, tableCFs, config.getBandwidth]) end diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb new file mode 100644 index 00000000000..f6de6150952 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb @@ -0,0 +1,54 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class SetPeerReplicateAll < Command + def help + <<-EOF + Set the replicate_all flag to true or false for the specified peer. + + If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0) + will be replicate to peer cluster. + + If replicate_all flag is false, then all user tables cannot be replicate to + peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces' + to set which namespaces will be replicated to peer cluster. And you can use + 'set_peer_tableCFs' or 'append_peer_tableCFs' to set which tables will be + replicated to peer cluster. + + Notice: When you want to change a peer's replicate_all flag from false to true, + you need clean the peer's NAMESPACES and TABLECFS config firstly. + + Examples: + + # set replicate_all flag to true + hbase> set_peer_replicate_all '1', true + # set replicate_all flag to false + hbase> set_peer_replicate_all '1', false +EOF + end + + def command(id, replicate_all) + replication_admin.set_peer_replicate_all(id, replicate_all) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 75f3c041a4a..4b74adaadea 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -73,8 +73,10 @@ module Hbase command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(true, peer.getPeerConfig.replicateAllUserTables) # cleanup for future tests command(:remove_peer, @peer_id) @@ -86,8 +88,10 @@ module Hbase command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(true, peer.getPeerConfig.replicateAllUserTables) # cleanup for future tests command(:remove_peer, @peer_id) @@ -131,8 +135,10 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(true, peer.getPeerConfig.replicateAllUserTables) # cleanup for future tests command(:remove_peer, @peer_id) @@ -147,11 +153,13 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - peer_config = command(:list_peers).get(0).getPeerConfig + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + peer_config = peer.getPeerConfig + assert_equal(false, peer_config.replicateAllUserTables) assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # cleanup for future tests command(:remove_peer, @peer_id) @@ -169,8 +177,10 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - peer_config = command(:list_peers).get(0).getPeerConfig + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + peer_config = peer.getPeerConfig + assert_equal(false, peer_config.replicateAllUserTables) assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -203,9 +213,11 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) - assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap()) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_tablecfs_equal(table_cfs, peer.getPeerConfig.getTableCFsMap) + assert_equal(false, peer.getPeerConfig.replicateAllUserTables) # cleanup for future tests command(:remove_peer, @peer_id) @@ -225,10 +237,12 @@ module Hbase cluster_key = "zk4,zk5,zk6:11000:/hbase-test" args = { CLUSTER_KEY => cluster_key} command(:add_peer, @peer_id, args) + command(:set_peer_replicate_all, @peer_id, false) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } command(:set_peer_tableCFs, @peer_id, table_cfs) @@ -242,10 +256,12 @@ module Hbase cluster_key = "zk4,zk5,zk6:11000:/hbase-test" args = { CLUSTER_KEY => cluster_key} command(:add_peer, @peer_id, args) + command(:set_peer_replicate_all, @peer_id, false) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } command(:append_peer_tableCFs, @peer_id, table_cfs) @@ -266,8 +282,9 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] }) @@ -284,6 +301,7 @@ module Hbase args = { CLUSTER_KEY => cluster_key } command(:add_peer, @peer_id, args) + command(:set_peer_replicate_all, @peer_id, false) command(:set_peer_namespaces, @peer_id, namespaces) @@ -291,7 +309,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # cleanup for future tests command(:remove_peer, @peer_id) @@ -304,6 +322,7 @@ module Hbase args = { CLUSTER_KEY => cluster_key } command(:add_peer, @peer_id, args) + command(:set_peer_replicate_all, @peer_id, false) command(:append_peer_namespaces, @peer_id, namespaces) @@ -311,7 +330,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) namespaces = ["ns3"] namespaces_str = "ns1;ns2;ns3" @@ -321,7 +340,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # append a namespace which is already in the peer config command(:append_peer_namespaces, @peer_id, namespaces) @@ -330,7 +349,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # cleanup for future tests command(:remove_peer, @peer_id) @@ -351,7 +370,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) namespaces = ["ns3"] namespaces_str = nil @@ -361,7 +380,7 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # remove a namespace which is not in peer config command(:remove_peer_namespaces, @peer_id, namespaces) @@ -370,12 +389,34 @@ module Hbase assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, - replication_admin.show_peer_namespaces(peer_config)) + replication_admin.show_peer_namespaces(peer_config)) # cleanup for future tests command(:remove_peer, @peer_id) end + define_test 'set_peer_replicate_all' do + cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' + + args = { CLUSTER_KEY => cluster_key } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer_config = command(:list_peers).get(0).getPeerConfig + assert_equal(true, peer_config.replicateAllUserTables) + + command(:set_peer_replicate_all, @peer_id, false) + peer_config = command(:list_peers).get(0).getPeerConfig + assert_equal(false, peer_config.replicateAllUserTables) + + command(:set_peer_replicate_all, @peer_id, true) + peer_config = command(:list_peers).get(0).getPeerConfig + assert_equal(true, peer_config.replicateAllUserTables) + + # cleanup for future tests + replication_admin.remove_peer(@peer_id) + end + define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key }