HBASE-16868 Add a replicate_all flag to avoid misuse the namespaces and table-cfs config of replication peer

This commit is contained in:
Guanghao Zhang 2017-11-23 14:54:19 +08:00
parent 2442cbb6ab
commit 3e2941a49e
22 changed files with 471 additions and 154 deletions

View File

@ -286,6 +286,7 @@ public final class ReplicationPeerConfigUtil {
if (tableCFsMap != null) { if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap); peerConfig.setTableCFsMap(tableCFsMap);
} }
List<ByteString> namespacesList = peer.getNamespacesList(); List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) { if (namespacesList != null && namespacesList.size() != 0) {
Set<String> namespaces = new HashSet<>(); Set<String> namespaces = new HashSet<>();
@ -294,9 +295,15 @@ public final class ReplicationPeerConfigUtil {
} }
peerConfig.setNamespaces(namespaces); peerConfig.setNamespaces(namespaces);
} }
if (peer.hasBandwidth()) { if (peer.hasBandwidth()) {
peerConfig.setBandwidth(peer.getBandwidth()); peerConfig.setBandwidth(peer.getBandwidth());
} }
if (peer.hasReplicateAll()) {
peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
}
return peerConfig; return peerConfig;
} }
@ -338,6 +345,7 @@ public final class ReplicationPeerConfigUtil {
} }
builder.setBandwidth(peerConfig.getBandwidth()); builder.setBandwidth(peerConfig.getBandwidth());
builder.setReplicateAll(peerConfig.replicateAllUserTables());
return builder.build(); return builder.build();
} }
@ -465,4 +473,4 @@ public final class ReplicationPeerConfigUtil {
return otherConf; return otherConf;
} }
} }

View File

@ -42,6 +42,8 @@ public class ReplicationPeerConfig {
private Map<TableName, ? extends Collection<String>> tableCFsMap = null; private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private Set<String> namespaces = null; private Set<String> namespaces = null;
private long bandwidth = 0; private long bandwidth = 0;
// Default value is true, means replicate all user tables to peer cluster.
private boolean replicateAllUserTables = true;
public ReplicationPeerConfig() { public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ -110,10 +112,20 @@ public class ReplicationPeerConfig {
return this; return this;
} }
public boolean replicateAllUserTables() {
return this.replicateAllUserTables;
}
public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
this.replicateAllUserTables = replicateAllUserTables;
return this;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
if (namespaces != null) { if (namespaces != null) {
builder.append("namespaces=").append(namespaces.toString()).append(","); builder.append("namespaces=").append(namespaces.toString()).append(",");
} }

View File

@ -45,6 +45,7 @@ message ReplicationPeer {
repeated TableCF table_cfs = 5; repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6; repeated bytes namespaces = 6;
optional int64 bandwidth = 7; optional int64 bandwidth = 7;
optional bool replicate_all = 8;
} }
/** /**

View File

@ -368,6 +368,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
existingConfig.setNamespaces(newConfig.getNamespaces()); existingConfig.setNamespaces(newConfig.getNamespaces());
existingConfig.setBandwidth(newConfig.getBandwidth()); existingConfig.setBandwidth(newConfig.getBandwidth());
existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
try { try {
ZKUtil.setData(this.zookeeper, getPeerNode(id), ZKUtil.setData(this.zookeeper, getPeerNode(id),

View File

@ -156,7 +156,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
@ -798,9 +798,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// This is for backwards compatibility // This is for backwards compatibility
// See HBASE-11393 // See HBASE-11393
status.setStatus("Update TableCFs node in ZNode"); status.setStatus("Update TableCFs node in ZNode");
TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper, ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper,
conf, this.clusterConnection); conf, this.clusterConnection);
tableCFsUpdater.update(); tableCFsUpdater.copyTableCFs();
// Add the Observer to delete space quotas on table deletion before starting all CPs by // Add the Observer to delete space quotas on table deletion before starting all CPs by
// default with quota support, avoiding if user specifically asks to not load this Observer. // default with quota support, avoiding if user specifically asks to not load this Observer.

View File

@ -71,9 +71,7 @@ public class ReplicationManager {
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException { throws ReplicationException, IOException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), checkPeerConfig(peerConfig);
peerConfig.getTableCFsMap());
checkConfiguredWALEntryFilters(peerConfig);
replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.registerPeer(peerId, peerConfig, enabled);
replicationPeers.peerConnected(peerId); replicationPeers.peerConnected(peerId);
} }
@ -102,9 +100,7 @@ public class ReplicationManager {
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException { throws ReplicationException, IOException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), checkPeerConfig(peerConfig);
peerConfig.getTableCFsMap());
checkConfiguredWALEntryFilters(peerConfig);
this.replicationPeers.updatePeerConfig(peerId, peerConfig); this.replicationPeers.updatePeerConfig(peerId, peerConfig);
} }
@ -122,6 +118,21 @@ public class ReplicationManager {
return peers; return peers;
} }
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
IOException {
if (peerConfig.replicateAllUserTables()) {
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
throw new ReplicationException(
"Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
}
} else {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
}
checkConfiguredWALEntryFilters(peerConfig);
}
/** /**
* Set a namespace in the peer config means that all tables in this namespace * Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster. * will be replicated to the peer cluster.
@ -150,8 +161,6 @@ public class ReplicationManager {
"Table-cfs config conflict with namespaces config in peer"); "Table-cfs config conflict with namespaces config in peer");
} }
} }
} }
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)

View File

@ -58,69 +58,74 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
public Entry filter(Entry entry) { public Entry filter(Entry entry) {
TableName tabName = entry.getKey().getTablename(); TableName tabName = entry.getKey().getTablename();
String namespace = tabName.getNamespaceAsString(); String namespace = tabName.getNamespaceAsString();
Set<String> namespaces = this.peer.getNamespaces(); ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
Map<TableName, List<String>> tableCFs = getTableCfs();
if (peerConfig.replicateAllUserTables()) {
// replicate all user tables, so return entry directly
return entry;
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
Set<String> namespaces = peerConfig.getNamespaces();
Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
if (namespaces == null && tableCFs == null) {
return null;
}
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return entry;
}
// Then filter by table-cfs config
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicaable namespace list and table list
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
return null;
}
// If null means user has explicitly not configured any namespaces and table CFs
// so all the tables data are applicable for replication
if (namespaces == null && tableCFs == null) {
return entry; return entry;
} }
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return entry;
}
// Then filter by table-cfs config
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicaable namespace list and table list
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
return null;
}
return entry;
} }
@Override @Override
public Cell filterCell(final Entry entry, Cell cell) { public Cell filterCell(final Entry entry, Cell cell) {
final Map<TableName, List<String>> tableCfs = getTableCfs(); ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
if (tableCfs == null) return cell; if (peerConfig.replicateAllUserTables()) {
TableName tabName = entry.getKey().getTablename(); // replicate all user tables, so return cell directly
List<String> cfs = tableCfs.get(tabName); return cell;
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
public boolean apply(byte[] fam) {
if (tableCfs != null) {
List<String> cfs = tableCfs.get(entry.getKey().getTablename());
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
return true;
}
}
return false;
}
});
} else { } else {
if ((cfs != null) && !cfs.contains( final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) { if (tableCfs == null) {
return null; return cell;
}
TableName tabName = entry.getKey().getTablename();
List<String> cfs = tableCfs.get(tabName);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
public boolean apply(byte[] fam) {
if (tableCfs != null) {
List<String> cfs = tableCfs.get(entry.getKey().getTablename());
if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
return true;
}
}
return false;
}
});
} else {
if ((cfs != null)
&& !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength()))) {
return null;
}
} }
}
return cell;
}
Map<TableName, List<String>> getTableCfs() { return cell;
Map<TableName, List<String>> tableCFs = null;
try {
tableCFs = this.peer.getTableCFs();
} catch (IllegalArgumentException e) {
LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
", degenerate as if it's not configured by keeping tableCFs==null");
} }
return tableCFs;
} }
} }

View File

@ -18,14 +18,19 @@
*/ */
package org.apache.hadoop.hbase.replication.master; package org.apache.hadoop.hbase.replication.master;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -34,8 +39,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import java.io.IOException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import java.util.List;
/** /**
* This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
@ -43,16 +47,36 @@ import java.util.List;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TableCFsUpdater extends ReplicationStateZKBase { public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
private static final Log LOG = LogFactory.getLog(TableCFsUpdater.class); private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUpgrader.class);
public TableCFsUpdater(ZKWatcher zookeeper, public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
Configuration conf, Abortable abortable) { Configuration conf, Abortable abortable) {
super(zookeeper, conf, abortable); super(zookeeper, conf, abortable);
} }
public void update() { public void upgrade() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
admin.listReplicationPeers().forEach(
(peerDesc) -> {
String peerId = peerDesc.getPeerId();
ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
peerConfig.setReplicateAllUserTables(false);
try {
admin.updateReplicationPeerConfig(peerId, peerConfig);
} catch (Exception e) {
LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
}
}
});
}
}
public void copyTableCFs() {
List<String> znodes = null; List<String> znodes = null;
try { try {
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
@ -61,14 +85,14 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
} }
if (znodes != null) { if (znodes != null) {
for (String peerId : znodes) { for (String peerId : znodes) {
if (!update(peerId)) { if (!copyTableCFs(peerId)) {
LOG.error("upgrade tableCFs failed for peerId=" + peerId); LOG.error("upgrade tableCFs failed for peerId=" + peerId);
} }
} }
} }
} }
public boolean update(String peerId) { public boolean copyTableCFs(String peerId) {
String tableCFsNode = getTableCFsNode(peerId); String tableCFsNode = getTableCFsNode(peerId);
try { try {
if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
@ -121,10 +145,13 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
} }
private static void printUsageAndExit() { private static void printUsageAndExit() {
System.err.printf("Usage: hbase org.apache.hadoop.hbase.replication.master.TableCFsUpdater [options]"); System.err.printf(
"Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
+ " [options]");
System.err.println(" where [options] are:"); System.err.println(" where [options] are:");
System.err.println(" -h|-help Show this help and exit."); System.err.println(" -h|-help Show this help and exit.");
System.err.println(" update Copy table-cfs to replication peer config"); System.err.println(" copyTableCFs Copy table-cfs to replication peer config");
System.err.println(" upgrade Upgrade replication peer config to new format");
System.err.println(); System.err.println();
System.exit(1); System.exit(1);
} }
@ -135,15 +162,21 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
} }
if (args[0].equals("-help") || args[0].equals("-h")) { if (args[0].equals("-help") || args[0].equals("-h")) {
printUsageAndExit(); printUsageAndExit();
} else if (args[0].equals("update")) { } else if (args[0].equals("copyTableCFs")) {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
ZKWatcher zkw = new ZKWatcher(conf, "TableCFsUpdater", null); ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
try { try {
TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zkw, conf, null); ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw,
tableCFsUpdater.update(); conf, null);
tableCFsUpdater.copyTableCFs();
} finally { } finally {
zkw.close(); zkw.close();
} }
} else if (args[0].equals("upgrade")) {
Configuration conf = HBaseConfiguration.create();
ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null);
upgrader.upgrade();
} else { } else {
printUsageAndExit(); printUsageAndExit();
} }

View File

@ -149,6 +149,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
// Add a valid peer // Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join(); admin.addReplicationPeer(ID_ONE, rpc1).join();
rpc1.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
@ -248,6 +250,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
// Add a valid peer // Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join(); admin.addReplicationPeer(ID_ONE, rpc1).join();
rpc1.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
try { try {
tableCFs.put(tableName3, null); tableCFs.put(tableName3, null);
@ -328,6 +333,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join(); admin.addReplicationPeer(ID_ONE, rpc).join();
rpc.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
// add ns1 and ns2 to peer config // add ns1 and ns2 to peer config
rpc = admin.getReplicationPeerConfig(ID_ONE).get(); rpc = admin.getReplicationPeerConfig(ID_ONE).get();
@ -364,6 +371,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join(); admin.addReplicationPeer(ID_ONE, rpc).join();
rpc.setReplicateAllUserTables(false);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
rpc = admin.getReplicationPeerConfig(ID_ONE).get(); rpc = admin.getReplicationPeerConfig(ID_ONE).get();
Set<String> namespaces = new HashSet<String>(); Set<String> namespaces = new HashSet<String>();

View File

@ -218,6 +218,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>(); Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, null); tableCfs.put(tableName, null);
ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get(); ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
rpc.setReplicateAllUserTables(false);
rpc.setTableCFsMap(tableCfs); rpc.setTableCFsMap(tableCfs);
try { try {
// Only add tableName to replication peer config // Only add tableName to replication peer config
@ -236,6 +237,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
admin2.tableExists(tableName2).get()); admin2.tableExists(tableName2).get());
} finally { } finally {
rpc.setTableCFsMap(null); rpc.setTableCFsMap(null);
rpc.setReplicateAllUserTables(true);
admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
} }
} }

View File

@ -85,10 +85,9 @@ public class TestReplicationAdmin {
*/ */
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
Configuration conf = TEST_UTIL.getConfiguration(); admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
admin = new ReplicationAdmin(conf);
hbaseAdmin = TEST_UTIL.getAdmin(); hbaseAdmin = TEST_UTIL.getAdmin();
} }
@ -238,8 +237,8 @@ public class TestReplicationAdmin {
@Test @Test
public void testAppendPeerTableCFs() throws Exception { public void testAppendPeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
@ -248,10 +247,14 @@ public class TestReplicationAdmin {
final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
// Add a valid peer // Add a valid peer
admin.addPeer(ID_ONE, rpc1, null); hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
// Update peer config, not replicate all user tables
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
rpc.setReplicateAllUserTables(false);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
tableCFs.put(tableName1, null); tableCFs.put(tableName1, null);
admin.appendPeerTableCFs(ID_ONE, tableCFs); admin.appendPeerTableCFs(ID_ONE, tableCFs);
Map<TableName, List<String>> result = Map<TableName, List<String>> result =
@ -338,14 +341,21 @@ public class TestReplicationAdmin {
@Test @Test
public void testRemovePeerTableCFs() throws Exception { public void testRemovePeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
// Add a valid peer // Add a valid peer
admin.addPeer(ID_ONE, rpc1, null); hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
// Update peer config, not replicate all user tables
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
rpc.setReplicateAllUserTables(false);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
try { try {
tableCFs.put(tableName3, null); tableCFs.put(tableName3, null);
@ -423,27 +433,98 @@ public class TestReplicationAdmin {
rpc.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc); hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
rpc.setReplicateAllUserTables(false);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<>(); Set<String> namespaces = new HashSet<>();
namespaces.add(ns1); namespaces.add(ns1);
namespaces.add(ns2); namespaces.add(ns2);
rpc.setNamespaces(namespaces); rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
assertEquals(2, namespaces.size()); assertEquals(2, namespaces.size());
assertTrue(namespaces.contains(ns1)); assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2)); assertTrue(namespaces.contains(ns2));
rpc = admin.getPeerConfig(ID_ONE); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
namespaces.clear(); namespaces.clear();
namespaces.add(ns1); namespaces.add(ns1);
rpc.setNamespaces(namespaces); rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
namespaces = admin.getPeerConfig(ID_ONE).getNamespaces(); namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
assertEquals(1, namespaces.size()); assertEquals(1, namespaces.size());
assertTrue(namespaces.contains(ns1)); assertTrue(namespaces.contains(ns1));
admin.removePeer(ID_ONE); hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
public void testSetReplicateAllUserTables() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertTrue(rpc.replicateAllUserTables());
rpc.setReplicateAllUserTables(false);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertFalse(rpc.replicateAllUserTables());
rpc.setReplicateAllUserTables(true);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertTrue(rpc.replicateAllUserTables());
hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
public void testPeerConfigConflict() throws Exception {
// Default replicate all flag is true
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
String ns1 = "ns1";
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
TableName tab1 = TableName.valueOf("ns1:tabl");
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(tab1, new ArrayList<String>());
try {
rpc.setNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
} catch (IOException e) {
// OK
rpc.setNamespaces(null);
}
try {
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
} catch (IOException e) {
// OK
rpc.setTableCFsMap(null);
}
try {
rpc.setNamespaces(namespaces);
rpc.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
fail("Should throw Exception."
+ " When replicate all flag is true, no need to config namespaces or table-cfs");
} catch (IOException e) {
// OK
rpc.setNamespaces(null);
rpc.setTableCFsMap(null);
}
} }
@Test @Test
@ -455,6 +536,7 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);
rpc.setReplicateAllUserTables(false);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc); hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE); rpc = admin.getPeerConfig(ID_ONE);

View File

@ -194,7 +194,13 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin2.disableTable(TestReplicationBase.tableName); admin2.disableTable(TestReplicationBase.tableName);
admin2.deleteTable(TestReplicationBase.tableName); admin2.deleteTable(TestReplicationBase.tableName);
} }
assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(TestReplicationBase.tableName)); assertFalse("Table should not exists in the peer cluster",
admin2.isTableAvailable(TestReplicationBase.tableName));
// update peer config
ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
rpc.setReplicateAllUserTables(false);
admin1.updateReplicationPeerConfig(peerId, rpc);
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>(); Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, null); tableCfs.put(tableName, null);
@ -214,6 +220,10 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
} finally { } finally {
adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId)); adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
admin1.disableTableReplication(TestReplicationBase.tableName); admin1.disableTableReplication(TestReplicationBase.tableName);
rpc = admin1.getReplicationPeerConfig(peerId);
rpc.setReplicateAllUserTables(true);
admin1.updateReplicationPeerConfig(peerId, rpc);
} }
} }

View File

@ -524,10 +524,12 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception { throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) try (Admin admin =
.getAdmin()) { ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
admin.addReplicationPeer(id, admin.addReplicationPeer(
id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs))); .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
} }
} }

View File

@ -140,8 +140,12 @@ public class TestNamespaceReplication extends TestReplicationBase {
Table htab1B = connection1.getTable(tabBName); Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName); Table htab2B = connection2.getTable(tabBName);
// add ns1 to peer config which replicate to cluster2
ReplicationPeerConfig rpc = admin.getPeerConfig("2"); ReplicationPeerConfig rpc = admin.getPeerConfig("2");
rpc.setReplicateAllUserTables(false);
admin.updatePeerConfig("2", rpc);
// add ns1 to peer config which replicate to cluster2
rpc = admin.getPeerConfig("2");
Set<String> namespaces = new HashSet<>(); Set<String> namespaces = new HashSet<>();
namespaces.add(ns1); namespaces.add(ns1);
rpc.setNamespaces(namespaces); rpc.setNamespaces(namespaces);

View File

@ -404,6 +404,7 @@ public class TestPerTableCFReplication {
// A. add cluster2/cluster3 as peers to cluster1 // A. add cluster2/cluster3 as peers to cluster1
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(utility2.getClusterKey()); rpc2.setClusterKey(utility2.getClusterKey());
rpc2.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
tableCFs.put(tabCName, null); tableCFs.put(tabCName, null);
tableCFs.put(tabBName, new ArrayList<>()); tableCFs.put(tabBName, new ArrayList<>());
@ -413,6 +414,7 @@ public class TestPerTableCFReplication {
ReplicationPeerConfig rpc3 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
rpc3.setClusterKey(utility3.getClusterKey()); rpc3.setClusterKey(utility3.getClusterKey());
rpc3.setReplicateAllUserTables(false);
tableCFs.clear(); tableCFs.clear();
tableCFs.put(tabAName, null); tableCFs.put(tabAName, null);
tableCFs.put(tabBName, new ArrayList<>()); tableCFs.put(tabBName, new ArrayList<>());
@ -518,7 +520,7 @@ public class TestPerTableCFReplication {
connection2.close(); connection2.close();
connection3.close(); connection3.close();
} }
} }
private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException { private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
Get get = new Get(row); Get get = new Get(row);

View File

@ -202,19 +202,31 @@ public class TestReplicationWALEntryFilters {
@Test @Test
public void testNamespaceTableCfWALEntryFilter() { public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class); ReplicationPeer peer = mock(ReplicationPeer.class);
ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
// 1. no namespaces config and table-cfs config in peer // 1. replicate all user tables
when(peer.getNamespaces()).thenReturn(null); when(peerConfig.replicateAllUserTables()).thenReturn(true);
when(peer.getTableCFs()).thenReturn(null); when(peer.getPeerConfig()).thenReturn(peerConfig);
Entry userEntry = createEntry(null, a, b, c); Entry userEntry = createEntry(null, a, b, c);
WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// 2. Only config table-cfs in peer // 2. not replicate all user tables, no namespaces and table-cfs config
when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(null);
when(peerConfig.getTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// 3. Only config table-cfs in peer
// empty map // empty map
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>(); Map<TableName, List<String>> tableCfs = new HashMap<>();
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry)); assertEquals(null, filter.filter(userEntry));
@ -222,7 +234,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("bar"), null); tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry)); assertEquals(null, filter.filter(userEntry));
@ -230,7 +244,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry)); assertEquals(createEntry(null, a), filter.filter(userEntry));
@ -238,7 +254,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c, d); userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry)); assertEquals(createEntry(null, a,c), filter.filter(userEntry));
@ -246,14 +264,19 @@ public class TestReplicationWALEntryFilters {
when(peer.getTableCFs()).thenReturn(null); when(peer.getTableCFs()).thenReturn(null);
// empty set // empty set
Set<String> namespaces = new HashSet<>(); Set<String> namespaces = new HashSet<>();
when(peer.getNamespaces()).thenReturn(namespaces); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(null);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry)); assertEquals(null, filter.filter(userEntry));
// namespace default // namespace default
namespaces.add("default"); namespaces.add("default");
when(peer.getNamespaces()).thenReturn(namespaces); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry)); assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@ -261,7 +284,9 @@ public class TestReplicationWALEntryFilters {
// namespace ns1 // namespace ns1
namespaces = new HashSet<>(); namespaces = new HashSet<>();
namespaces.add("ns1"); namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry)); assertEquals(null, filter.filter(userEntry));
@ -271,9 +296,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>(); namespaces = new HashSet<>();
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
namespaces.add("ns1"); namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry)); assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@ -281,9 +308,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>(); namespaces = new HashSet<>();
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
namespaces.add("default"); namespaces.add("default");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c")); tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry)); assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@ -291,9 +320,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>(); namespaces = new HashSet<>();
tableCfs = new HashMap<>(); tableCfs = new HashMap<>();
namespaces.add("ns1"); namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("bar"), null); tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs); when(peerConfig.replicateAllUserTables()).thenReturn(false);
when(peerConfig.getNamespaces()).thenReturn(namespaces);
when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c); userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer)); filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry)); assertEquals(null, filter.filter(userEntry));

View File

@ -50,7 +50,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@Category({ReplicationTests.class, SmallTests.class}) @Category({ReplicationTests.class, SmallTests.class})
public class TestTableCFsUpdater extends TableCFsUpdater { public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class); private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -164,7 +164,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
assertNull(actualRpc.getTableCFsMap()); assertNull(actualRpc.getTableCFsMap());
assertNull(actualTableCfs); assertNull(actualTableCfs);
update(); copyTableCFs();
peerId = "1"; peerId = "1";
peerNode = getPeerNode(peerId); peerNode = getPeerNode(peerId);

View File

@ -92,6 +92,7 @@ module Hbase
namespaces.each do |n| namespaces.each do |n|
ns_set.add(n) ns_set.add(n)
end end
replication_peer_config.setReplicateAllUserTables(false)
replication_peer_config.set_namespaces(ns_set) replication_peer_config.set_namespaces(ns_set)
end end
@ -101,6 +102,7 @@ module Hbase
table_cfs.each do |key, val| table_cfs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end end
replication_peer_config.setReplicateAllUserTables(false)
replication_peer_config.set_table_cfs_map(map) replication_peer_config.set_table_cfs_map(map)
end end
@ -265,6 +267,13 @@ module Hbase
end end
end end
def set_peer_replicate_all(id, replicate_all)
rpc = @replication_admin.getPeerConfig(id)
return if rpc.nil?
rpc.setReplicateAllUserTables(replicate_all)
@replication_admin.updatePeerConfig(id, rpc)
end
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Enables a table's replication switch # Enables a table's replication switch
def enable_tablerep(table_name) def enable_tablerep(table_name)

View File

@ -377,6 +377,7 @@ Shell.load_command_group(
list_peers list_peers
enable_peer enable_peer
disable_peer disable_peer
set_peer_replicate_all
set_peer_namespaces set_peer_namespaces
append_peer_namespaces append_peer_namespaces
remove_peer_namespaces remove_peer_namespaces

View File

@ -33,7 +33,7 @@ EOF
peers = replication_admin.list_peers peers = replication_admin.list_peers
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
STATE NAMESPACES TABLE_CFS BANDWIDTH]) STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH])
peers.each do |peer| peers.each do |peer|
id = peer.getPeerId id = peer.getPeerId
@ -42,7 +42,8 @@ EOF
namespaces = replication_admin.show_peer_namespaces(config) namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs(id) tableCFs = replication_admin.show_peer_tableCFs(id)
formatter.row([id, config.getClusterKey, formatter.row([id, config.getClusterKey,
config.getReplicationEndpointImpl, state, namespaces, tableCFs, config.getReplicationEndpointImpl, state,
config.replicateAllUserTables, namespaces, tableCFs,
config.getBandwidth]) config.getBandwidth])
end end

View File

@ -0,0 +1,54 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class SetPeerReplicateAll < Command
def help
<<-EOF
Set the replicate_all flag to true or false for the specified peer.
If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
will be replicate to peer cluster.
If replicate_all flag is false, then all user tables cannot be replicate to
peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
to set which namespaces will be replicated to peer cluster. And you can use
'set_peer_tableCFs' or 'append_peer_tableCFs' to set which tables will be
replicated to peer cluster.
Notice: When you want to change a peer's replicate_all flag from false to true,
you need clean the peer's NAMESPACES and TABLECFS config firstly.
Examples:
# set replicate_all flag to true
hbase> set_peer_replicate_all '1', true
# set replicate_all flag to false
hbase> set_peer_replicate_all '1', false
EOF
end
def command(id, replicate_all)
replication_admin.set_peer_replicate_all(id, replicate_all)
end
end
end
end

View File

@ -73,8 +73,10 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -86,8 +88,10 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -131,8 +135,10 @@ module Hbase
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -147,11 +153,13 @@ module Hbase
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(@peer_id, peer.getPeerId)
peer_config = peer.getPeerConfig
assert_equal(false, peer_config.replicateAllUserTables)
assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -169,8 +177,10 @@ module Hbase
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(@peer_id, peer.getPeerId)
peer_config = peer.getPeerConfig
assert_equal(false, peer_config.replicateAllUserTables)
assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
@ -203,9 +213,11 @@ module Hbase
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap()) assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
assert_tablecfs_equal(table_cfs, peer.getPeerConfig.getTableCFsMap)
assert_equal(false, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -225,10 +237,12 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key} args = { CLUSTER_KEY => cluster_key}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
command(:set_peer_tableCFs, @peer_id, table_cfs) command(:set_peer_tableCFs, @peer_id, table_cfs)
@ -242,10 +256,12 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key} args = { CLUSTER_KEY => cluster_key}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:append_peer_tableCFs, @peer_id, table_cfs) command(:append_peer_tableCFs, @peer_id, table_cfs)
@ -266,8 +282,9 @@ module Hbase
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) peer = command(:list_peers).get(0)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_equal(@peer_id, peer.getPeerId)
assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] }) command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] })
@ -284,6 +301,7 @@ module Hbase
args = { CLUSTER_KEY => cluster_key } args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false)
command(:set_peer_namespaces, @peer_id, namespaces) command(:set_peer_namespaces, @peer_id, namespaces)
@ -291,7 +309,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -304,6 +322,7 @@ module Hbase
args = { CLUSTER_KEY => cluster_key } args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false)
command(:append_peer_namespaces, @peer_id, namespaces) command(:append_peer_namespaces, @peer_id, namespaces)
@ -311,7 +330,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
namespaces = ["ns3"] namespaces = ["ns3"]
namespaces_str = "ns1;ns2;ns3" namespaces_str = "ns1;ns2;ns3"
@ -321,7 +340,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# append a namespace which is already in the peer config # append a namespace which is already in the peer config
command(:append_peer_namespaces, @peer_id, namespaces) command(:append_peer_namespaces, @peer_id, namespaces)
@ -330,7 +349,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
@ -351,7 +370,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
namespaces = ["ns3"] namespaces = ["ns3"]
namespaces_str = nil namespaces_str = nil
@ -361,7 +380,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# remove a namespace which is not in peer config # remove a namespace which is not in peer config
command(:remove_peer_namespaces, @peer_id, namespaces) command(:remove_peer_namespaces, @peer_id, namespaces)
@ -370,12 +389,34 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str, assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config)) replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests # cleanup for future tests
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
end end
define_test 'set_peer_replicate_all' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(true, peer_config.replicateAllUserTables)
command(:set_peer_replicate_all, @peer_id, false)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(false, peer_config.replicateAllUserTables)
command(:set_peer_replicate_all, @peer_id, true)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(true, peer_config.replicateAllUserTables)
# cleanup for future tests
replication_admin.remove_peer(@peer_id)
end
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
cluster_key = "localhost:2181:/hbase-test" cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key } args = { CLUSTER_KEY => cluster_key }