HBASE-14717 enable_table_replication command should only create specified table for a peer cluster (Ashish)
This commit is contained in:
parent
e15c48ed2c
commit
a1a19d9405
|
@ -599,7 +599,17 @@ public class ReplicationAdmin implements Closeable {
|
||||||
if (repPeers == null || repPeers.size() <= 0) {
|
if (repPeers == null || repPeers.size() <= 0) {
|
||||||
throw new IllegalArgumentException("Found no peer cluster for replication.");
|
throw new IllegalArgumentException("Found no peer cluster for replication.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
|
||||||
|
|
||||||
for (ReplicationPeer repPeer : repPeers) {
|
for (ReplicationPeer repPeer : repPeers) {
|
||||||
|
Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
|
||||||
|
// TODO Currently peer TableCFs will not include namespace so we need to check only for table
|
||||||
|
// name without namespace in it. Need to correct this logic once we fix HBASE-11386.
|
||||||
|
if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
Configuration peerConf = repPeer.getConfiguration();
|
Configuration peerConf = repPeer.getConfiguration();
|
||||||
HTableDescriptor htd = null;
|
HTableDescriptor htd = null;
|
||||||
try (Connection conn = ConnectionFactory.createConnection(peerConf);
|
try (Connection conn = ConnectionFactory.createConnection(peerConf);
|
||||||
|
@ -638,7 +648,8 @@ public class ReplicationAdmin implements Closeable {
|
||||||
try {
|
try {
|
||||||
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
|
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
|
||||||
Configuration peerConf = pair.getSecond();
|
Configuration peerConf = pair.getSecond();
|
||||||
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
|
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
|
||||||
|
parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
|
||||||
s =
|
s =
|
||||||
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
|
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
|
||||||
null);
|
null);
|
||||||
|
|
|
@ -55,8 +55,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
|
||||||
private TableCFsTracker tableCFsTracker;
|
private TableCFsTracker tableCFsTracker;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor that takes all the objects required to communicate with the
|
* Constructor that takes all the objects required to communicate with the specified peer, except
|
||||||
* specified peer, except for the region server addresses.
|
* for the region server addresses.
|
||||||
* @param conf configuration object to this peer
|
* @param conf configuration object to this peer
|
||||||
* @param id string representation of this peer's identifier
|
* @param id string representation of this peer's identifier
|
||||||
* @param peerConfig configuration for the replication peer
|
* @param peerConfig configuration for the replication peer
|
||||||
|
@ -68,6 +68,22 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that takes all the objects required to communicate with the specified peer, except
|
||||||
|
* for the region server addresses.
|
||||||
|
* @param conf configuration object to this peer
|
||||||
|
* @param id string representation of this peer's identifier
|
||||||
|
* @param peerConfig configuration for the replication peer
|
||||||
|
* @param tableCFs table-cf configuration for this peer
|
||||||
|
*/
|
||||||
|
public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
|
||||||
|
Map<TableName, List<String>> tableCFs) throws ReplicationException {
|
||||||
|
this.conf = conf;
|
||||||
|
this.peerConfig = peerConfig;
|
||||||
|
this.id = id;
|
||||||
|
this.tableCFs = tableCFs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* start a state tracker to check whether this peer is enabled or not
|
* start a state tracker to check whether this peer is enabled or not
|
||||||
*
|
*
|
||||||
|
|
|
@ -15,6 +15,10 @@ import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
@ -41,6 +45,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
static Connection connection2;
|
static Connection connection2;
|
||||||
static Admin admin1;
|
static Admin admin1;
|
||||||
static Admin admin2;
|
static Admin admin2;
|
||||||
|
static ReplicationAdmin adminExt;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
@ -49,12 +54,14 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
connection2 = ConnectionFactory.createConnection(conf2);
|
connection2 = ConnectionFactory.createConnection(conf2);
|
||||||
admin1 = connection1.getAdmin();
|
admin1 = connection1.getAdmin();
|
||||||
admin2 = connection2.getAdmin();
|
admin2 = connection2.getAdmin();
|
||||||
|
adminExt = new ReplicationAdmin(conf1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
admin1.close();
|
admin1.close();
|
||||||
admin2.close();
|
admin2.close();
|
||||||
|
adminExt.close();
|
||||||
connection1.close();
|
connection1.close();
|
||||||
connection2.close();
|
connection2.close();
|
||||||
TestReplicationBase.tearDownAfterClass();
|
TestReplicationBase.tearDownAfterClass();
|
||||||
|
@ -65,7 +72,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
admin2.disableTable(tableName);
|
admin2.disableTable(tableName);
|
||||||
admin2.deleteTable(tableName);
|
admin2.deleteTable(tableName);
|
||||||
assertFalse(admin2.tableExists(tableName));
|
assertFalse(admin2.tableExists(tableName));
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.enableTableRep(tableName);
|
adminExt.enableTableRep(tableName);
|
||||||
assertTrue(admin2.tableExists(tableName));
|
assertTrue(admin2.tableExists(tableName));
|
||||||
}
|
}
|
||||||
|
@ -84,7 +90,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
admin2.modifyTable(tableName, table);
|
admin2.modifyTable(tableName, table);
|
||||||
admin2.enableTable(tableName);
|
admin2.enableTable(tableName);
|
||||||
|
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.enableTableRep(tableName);
|
adminExt.enableTableRep(tableName);
|
||||||
table = admin1.getTableDescriptor(tableName);
|
table = admin1.getTableDescriptor(tableName);
|
||||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||||
|
@ -101,7 +106,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
admin2.modifyTable(tableName, table);
|
admin2.modifyTable(tableName, table);
|
||||||
admin2.enableTable(tableName);
|
admin2.enableTable(tableName);
|
||||||
|
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
try {
|
try {
|
||||||
adminExt.enableTableRep(tableName);
|
adminExt.enableTableRep(tableName);
|
||||||
fail("Exception should be thrown if table descriptors in the clusters are not same.");
|
fail("Exception should be thrown if table descriptors in the clusters are not same.");
|
||||||
|
@ -120,7 +124,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testDisableAndEnableReplication() throws Exception {
|
public void testDisableAndEnableReplication() throws Exception {
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.disableTableRep(tableName);
|
adminExt.disableTableRep(tableName);
|
||||||
HTableDescriptor table = admin1.getTableDescriptor(tableName);
|
HTableDescriptor table = admin1.getTableDescriptor(tableName);
|
||||||
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
for (HColumnDescriptor fam : table.getColumnFamilies()) {
|
||||||
|
@ -139,25 +142,57 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
|
||||||
|
|
||||||
@Test(timeout = 300000, expected = TableNotFoundException.class)
|
@Test(timeout = 300000, expected = TableNotFoundException.class)
|
||||||
public void testDisableReplicationForNonExistingTable() throws Exception {
|
public void testDisableReplicationForNonExistingTable() throws Exception {
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.disableTableRep(TableName.valueOf("nonExistingTable"));
|
adminExt.disableTableRep(TableName.valueOf("nonExistingTable"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000, expected = TableNotFoundException.class)
|
@Test(timeout = 300000, expected = TableNotFoundException.class)
|
||||||
public void testEnableReplicationForNonExistingTable() throws Exception {
|
public void testEnableReplicationForNonExistingTable() throws Exception {
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.enableTableRep(TableName.valueOf("nonExistingTable"));
|
adminExt.enableTableRep(TableName.valueOf("nonExistingTable"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000, expected = IllegalArgumentException.class)
|
@Test(timeout = 300000, expected = IllegalArgumentException.class)
|
||||||
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
|
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.disableTableRep(null);
|
adminExt.disableTableRep(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000, expected = IllegalArgumentException.class)
|
@Test(timeout = 300000, expected = IllegalArgumentException.class)
|
||||||
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
|
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
|
||||||
ReplicationAdmin adminExt = new ReplicationAdmin(conf1);
|
|
||||||
adminExt.enableTableRep(null);
|
adminExt.enableTableRep(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test enable table replication should create table only in user explicit specified table-cfs.
|
||||||
|
* HBASE-14717
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
|
||||||
|
TableName tn = TableName.valueOf("testEnableReplicationForSetTableCfs");
|
||||||
|
String peerId = "2";
|
||||||
|
if (admin2.isTableAvailable(tableName)) {
|
||||||
|
admin2.disableTable(tableName);
|
||||||
|
admin2.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(tableName));
|
||||||
|
|
||||||
|
Map<TableName, ? extends Collection<String>> tableCfs =
|
||||||
|
new HashMap<TableName, Collection<String>>();
|
||||||
|
tableCfs.put(tn, null);
|
||||||
|
try {
|
||||||
|
adminExt.setPeerTableCFs(peerId, tableCfs);
|
||||||
|
adminExt.enableTableRep(tableName);
|
||||||
|
assertFalse("Table should not be created if user has set table cfs explicitly for the "
|
||||||
|
+ "peer and this is not part of that collection",
|
||||||
|
admin2.isTableAvailable(tableName));
|
||||||
|
|
||||||
|
tableCfs.put(tableName, null);
|
||||||
|
adminExt.setPeerTableCFs(peerId, tableCfs);
|
||||||
|
adminExt.enableTableRep(tableName);
|
||||||
|
assertTrue(
|
||||||
|
"Table should be created if user has explicitly added table into table cfs collection",
|
||||||
|
admin2.isTableAvailable(tableName));
|
||||||
|
} finally {
|
||||||
|
adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
|
||||||
|
adminExt.disableTableRep(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue