diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 24a3dcb0831..d7f58a9b131 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -600,7 +600,17 @@ public class ReplicationAdmin implements Closeable { if (repPeers == null || repPeers.size() <= 0) { throw new IllegalArgumentException("Found no peer cluster for replication."); } + + final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString()); + for (ReplicationPeer repPeer : repPeers) { + Map> tableCFMap = repPeer.getTableCFs(); + // TODO Currently peer TableCFs will not include namespace so we need to check only for table + // name without namespace in it. Need to correct this logic once we fix HBASE-11386. + if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) { + continue; + } + Configuration peerConf = repPeer.getConfiguration(); HTableDescriptor htd = null; try (Connection conn = ConnectionFactory.createConnection(peerConf); @@ -639,7 +649,8 @@ public class ReplicationAdmin implements Closeable { try { Pair pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); - ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), + parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); s = zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), null); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index e1ff0787a4d..a0d7b5fcb19 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -55,8 +55,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea private TableCFsTracker tableCFsTracker; /** - * Constructor that takes all the objects required to communicate with the - * specified peer, except for the region server addresses. + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. * @param conf configuration object to this peer * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer @@ -67,6 +67,22 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea this.peerConfig = peerConfig; this.id = id; } + + /** + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. + * @param conf configuration object to this peer + * @param id string representation of this peer's identifier + * @param peerConfig configuration for the replication peer + * @param tableCFs table-cf configuration for this peer + */ + public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, + Map> tableCFs) throws ReplicationException { + this.conf = conf; + this.peerConfig = peerConfig; + this.id = id; + this.tableCFs = tableCFs; + } /** * start a state tracker to check whether this peer is enabled or not diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index 0d4e853a075..d628a7c0b2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -15,6 +15,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -40,6 +44,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { static Connection connection2; static Admin admin1; static Admin admin2; + static ReplicationAdmin adminExt; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -48,12 +53,14 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { connection2 = ConnectionFactory.createConnection(conf2); admin1 = connection1.getAdmin(); admin2 = connection2.getAdmin(); + adminExt = new ReplicationAdmin(conf1); } @AfterClass public static void tearDownAfterClass() throws Exception { admin1.close(); admin2.close(); + adminExt.close(); connection1.close(); connection2.close(); TestReplicationBase.tearDownAfterClass(); @@ -64,7 +71,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.disableTable(tableName); admin2.deleteTable(tableName); assertFalse(admin2.tableExists(tableName)); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(tableName); assertTrue(admin2.tableExists(tableName)); } @@ -83,7 +89,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.modifyTable(tableName, table); admin2.enableTable(tableName); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(tableName); table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { @@ -100,7 +105,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { admin2.modifyTable(tableName, table); admin2.enableTable(tableName); - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); try { adminExt.enableTableRep(tableName); fail("Exception should be thrown if table descriptors in the clusters are not same."); @@ -119,7 +123,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000) public void testDisableAndEnableReplication() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(tableName); HTableDescriptor table = admin1.getTableDescriptor(tableName); for (HColumnDescriptor fam : table.getColumnFamilies()) { @@ -138,25 +141,57 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { @Test(timeout = 300000, expected = TableNotFoundException.class) public void testDisableReplicationForNonExistingTable() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(TableName.valueOf("nonExistingTable")); } @Test(timeout = 300000, expected = TableNotFoundException.class) public void testEnableReplicationForNonExistingTable() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(TableName.valueOf("nonExistingTable")); } @Test(timeout = 300000, expected = IllegalArgumentException.class) public void testDisableReplicationWhenTableNameAsNull() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.disableTableRep(null); } @Test(timeout = 300000, expected = IllegalArgumentException.class) public void testEnableReplicationWhenTableNameAsNull() throws Exception { - ReplicationAdmin adminExt = new ReplicationAdmin(conf1); adminExt.enableTableRep(null); } + + /* + * Test enable table replication should create table only in user explicit specified table-cfs. + * HBASE-14717 + */ + @Test(timeout = 300000) + public void testEnableReplicationForExplicitSetTableCfs() throws Exception { + TableName tn = TableName.valueOf("testEnableReplicationForSetTableCfs"); + String peerId = "2"; + if (admin2.isTableAvailable(tableName)) { + admin2.disableTable(tableName); + admin2.deleteTable(tableName); + } + assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(tableName)); + + Map> tableCfs = + new HashMap>(); + tableCfs.put(tn, null); + try { + adminExt.setPeerTableCFs(peerId, tableCfs); + adminExt.enableTableRep(tableName); + assertFalse("Table should not be created if user has set table cfs explicitly for the " + + "peer and this is not part of that collection", + admin2.isTableAvailable(tableName)); + + tableCfs.put(tableName, null); + adminExt.setPeerTableCFs(peerId, tableCfs); + adminExt.enableTableRep(tableName); + assertTrue( + "Table should be created if user has explicitly added table into table cfs collection", + admin2.isTableAvailable(tableName)); + } finally { + adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); + adminExt.disableTableRep(tableName); + } + } }