diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 4028d87f636..39c802f8df3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -22,10 +22,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; /** *

@@ -169,6 +174,55 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs)); } + public static Map> parseTableCFsFromConfig(String tableCFsConfig) { + if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { + return null; + } + + Map> tableCFsMap = null; + // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393 + // parse out (table, cf-list) pairs from tableCFsConfig + // format: "table1:cf1,cf2;table2:cfA,cfB" + String[] tables = tableCFsConfig.split(";"); + for (String tab : tables) { + // 1 ignore empty table config + tab = tab.trim(); + if (tab.length() == 0) { + continue; + } + // 2 split to "table" and "cf1,cf2" + // for each table: "table:cf1,cf2" or "table" + String[] pair = tab.split(":"); + String tabName = pair[0].trim(); + if (pair.length > 2 || tabName.length() == 0) { + LOG.error("ignore invalid tableCFs setting: " + tab); + continue; + } + + // 3 parse "cf1,cf2" part to List + List cfs = null; + if (pair.length == 2) { + String[] cfsList = pair[1].split(","); + for (String cf : cfsList) { + String cfName = cf.trim(); + if (cfName.length() > 0) { + if (cfs == null) { + cfs = new ArrayList(); + } + cfs.add(cfName); + } + } + } + + // 4 put > to map + if (tableCFsMap == null) { + tableCFsMap = new HashMap>(); + } + tableCFsMap.put(TableName.valueOf(tabName), cfs); + } + return tableCFsMap; + } + @VisibleForTesting static String getTableCfsStr(Map> tableCfs) { String tableCfsStr = null; @@ -264,6 +318,111 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.setPeerTableCFsConfig(id, tableCFs); } + /** + * Append the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + * @param tableCfs table-cfs config str + * @throws KeeperException + */ + public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException { + appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs)); + } + + /** + * Append the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + * @param tableCfs A map from tableName to column family names + * @throws KeeperException + */ + public void appendPeerTableCFs(String id, Map> tableCfs) + throws ReplicationException { + if (tableCfs == null) { + throw new ReplicationException("tableCfs is null"); + } + Map> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + if (preTableCfs == null) { + setPeerTableCFs(id, tableCfs); + return; + } + + for (Map.Entry> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection appendCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List cfs = preTableCfs.get(table); + if (cfs == null || appendCfs == null) { + preTableCfs.put(table, null); + } else { + Set cfSet = new HashSet(cfs); + cfSet.addAll(appendCfs); + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + preTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + setPeerTableCFs(id, preTableCfs); + } + + /** + * Remove some table-cfs from table-cfs config of the specified peer + * @param id a short name that identifies the cluster + * @param tableCf table-cfs config str + * @throws ReplicationException + */ + public void removePeerTableCFs(String id, String tableCf) throws ReplicationException { + removePeerTableCFs(id, parseTableCFsFromConfig(tableCf)); + } + + /** + * Remove some table-cfs from config of the specified peer + * @param id a short name that identifies the cluster + * @param tableCfs A map from tableName to column family names + * @throws ReplicationException + */ + public void removePeerTableCFs(String id, Map> tableCfs) + throws ReplicationException { + if (tableCfs == null) { + throw new ReplicationException("tableCfs is null"); + } + + Map> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + if (preTableCfs == null) { + throw new ReplicationException("Table-Cfs for peer" + id + " is null"); + } + for (Map.Entry> entry: tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection removeCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List cfs = preTableCfs.get(table); + if (cfs == null && removeCfs == null) { + preTableCfs.remove(table); + } else if (cfs != null && removeCfs != null) { + Set cfSet = new HashSet(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + preTableCfs.remove(table); + } else { + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && removeCfs != null) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from table-cfs config in peer: " + id); + } else if (cfs != null && removeCfs == null) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + } + } + setPeerTableCFs(id, preTableCfs); + } + /** * Set the replicable table-cf config of the specified peer * @param id a short name that identifies the cluster diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 2a05f973713..f0d8e868828 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1484,8 +1484,8 @@ public final class ProtobufUtil { /** * Convert a protocol buffer DeleteType to delete KeyValue type. * - * @param protocol buffer DeleteType - * @return type + * @param type The DeleteType + * @return The type. * @throws IOException */ public static KeyValue.Type fromDeleteType( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index c116674788f..729d2c73e49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; /** * ReplicationPeer manages enabled / disabled state for the peer. @@ -67,6 +68,6 @@ public interface ReplicationPeer { * Get replicable (table, cf-list) map of this peer * @return the replicable (table, cf-list) map */ - public Map> getTableCFs(); + public Map> getTableCFs(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index a39392ce997..0eaa744c72d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +31,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; @@ -49,7 +52,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea private final ReplicationPeerConfig peerConfig; private final String id; private volatile PeerState peerState; - private volatile Map> tableCFs = new HashMap>(); + private volatile Map> tableCFs = new HashMap>(); private final Configuration conf; private PeerStateTracker peerStateTracker; @@ -110,59 +113,9 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea this.readTableCFsZnode(); } - static Map> parseTableCFsFromConfig(String tableCFsConfig) { - if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { - return null; - } - - Map> tableCFsMap = null; - // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393 - // parse out (table, cf-list) pairs from tableCFsConfig - // format: "table1:cf1,cf2;table2:cfA,cfB" - String[] tables = tableCFsConfig.split(";"); - for (String tab : tables) { - // 1 ignore empty table config - tab = tab.trim(); - if (tab.length() == 0) { - continue; - } - // 2 split to "table" and "cf1,cf2" - // for each table: "table:cf1,cf2" or "table" - String[] pair = tab.split(":"); - String tabName = pair[0].trim(); - if (pair.length > 2 || tabName.length() == 0) { - LOG.error("ignore invalid tableCFs setting: " + tab); - continue; - } - - // 3 parse "cf1,cf2" part to List - List cfs = null; - if (pair.length == 2) { - String[] cfsList = pair[1].split(","); - for (String cf : cfsList) { - String cfName = cf.trim(); - if (cfName.length() > 0) { - if (cfs == null) { - cfs = new ArrayList(); - } - cfs.add(cfName); - } - } - } - - // 4 put > to map - if (tableCFsMap == null) { - tableCFsMap = new HashMap>(); - } - tableCFsMap.put(tabName, cfs); - } - - return tableCFsMap; - } - private void readTableCFsZnode() { String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); - this.tableCFs = parseTableCFsFromConfig(currentTableCFs); + this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); } @Override @@ -202,7 +155,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea * @return the replicable (table, cf-list) map */ @Override - public Map> getTableCFs() { + public Map> getTableCFs() { return this.tableCFs; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index b1c3b4967b7..da54c5441c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Pair; /** @@ -94,7 +95,7 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @return the table and column-family list which will be replicated for this peer */ - public Map> getTableCFs(String peerId); + public Map> getTableCFs(String peerId); /** * Returns the ReplicationPeer diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index af028fbaad4..a18d8e8478d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; + import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; @@ -188,7 +190,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public Map> getTableCFs(String id) throws IllegalArgumentException { + public Map> getTableCFs(String id) throws IllegalArgumentException { ReplicationPeer replicationPeer = this.peerClusters.get(id); if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java index 44e3c1c3144..0ea267dc7e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.util.Bytes; @@ -39,9 +40,9 @@ public class TableCfWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { - String tabName = entry.getKey().getTablename().getNameAsString(); + TableName tabName = entry.getKey().getTablename(); ArrayList cells = entry.getEdit().getCells(); - Map> tableCFs = null; + Map> tableCFs = null; try { tableCFs = this.peer.getTableCFs(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 77bc64e10c5..fcebfc58bd7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -154,5 +155,58 @@ public class TestReplicationAdmin { assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap)); } -} + @Test + public void testAppendPeerTableCFs() throws Exception { + // Add a valid peer + admin.addPeer(ID_ONE, KEY_ONE); + admin.appendPeerTableCFs(ID_ONE, "t1"); + assertEquals("t1", admin.getPeerTableCFs(ID_ONE)); + + // append table t2 to replication + admin.appendPeerTableCFs(ID_ONE, "t2"); + assertEquals("t2;t1", admin.getPeerTableCFs(ID_ONE)); + + // append table column family: f1 of t3 to replication + admin.appendPeerTableCFs(ID_ONE, "t3:f1"); + assertEquals("t3:f1;t2;t1", admin.getPeerTableCFs(ID_ONE)); + admin.removePeer(ID_ONE); + } + + @Test + public void testRemovePeerTableCFs() throws Exception { + // Add a valid peer + admin.addPeer(ID_ONE, KEY_ONE); + try { + admin.removePeerTableCFs(ID_ONE, "t3"); + assertTrue(false); + } catch (ReplicationException e) { + } + assertEquals("", admin.getPeerTableCFs(ID_ONE)); + + admin.setPeerTableCFs(ID_ONE, "t1;t2:cf1"); + try { + admin.removePeerTableCFs(ID_ONE, "t3"); + assertTrue(false); + } catch (ReplicationException e) { + } + assertEquals("t1;t2:cf1", admin.getPeerTableCFs(ID_ONE)); + + try { + admin.removePeerTableCFs(ID_ONE, "t1:f1"); + assertTrue(false); + } catch (ReplicationException e) { + } + admin.removePeerTableCFs(ID_ONE, "t1"); + assertEquals("t2:cf1", admin.getPeerTableCFs(ID_ONE)); + + try { + admin.removePeerTableCFs(ID_ONE, "t2"); + assertTrue(false); + } catch (ReplicationException e) { + } + admin.removePeerTableCFs(ID_ONE, "t2:cf1"); + assertEquals("", admin.getPeerTableCFs(ID_ONE)); + admin.removePeer(ID_ONE); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index bc3c38fd53c..6d8b68cea9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -173,89 +173,93 @@ public class TestPerTableCFReplication { @Test public void testParseTableCFsFromConfig() { - Map> tabCFsMap = null; + Map> tabCFsMap = null; // 1. null or empty string, result should be null - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(""); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(""); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" "); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(" "); assertEquals(null, tabCFsMap); + TableName tab1 = TableName.valueOf("tab1"); + TableName tab2 = TableName.valueOf("tab2"); + TableName tab3 = TableName.valueOf("tab3"); + // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1"); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1"); assertEquals(1, tabCFsMap.size()); // only one table - assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1" - assertFalse(tabCFsMap.containsKey("tab2")); // not other table - assertEquals(null, tabCFsMap.get("tab1")); // null cf-list, + assertTrue(tabCFsMap.containsKey(tab1)); // its table name is "tab1" + assertFalse(tabCFsMap.containsKey(tab2)); // not other table + assertEquals(null, tabCFsMap.get(tab1)); // null cf-list, - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1"); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1"); assertEquals(1, tabCFsMap.size()); // only one table - assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2" - assertFalse(tabCFsMap.containsKey("tab1")); // not other table - assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf - assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1" + assertTrue(tabCFsMap.containsKey(tab2)); // its table name is "tab2" + assertFalse(tabCFsMap.containsKey(tab1)); // not other table + assertEquals(1, tabCFsMap.get(tab2).size()); // cf-list contains only 1 cf + assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1" - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3"); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3"); assertEquals(1, tabCFsMap.size()); // only one table - assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2" - assertFalse(tabCFsMap.containsKey("tab1")); // not other table - assertEquals(2, tabCFsMap.get("tab3").size()); // cf-list contains 2 cf - assertTrue(tabCFsMap.get("tab3").contains("cf1"));// contains "cf1" - assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3" + assertTrue(tabCFsMap.containsKey(tab3)); // its table name is "tab2" + assertFalse(tabCFsMap.containsKey(tab1)); // not other table + assertEquals(2, tabCFsMap.get(tab3).size()); // cf-list contains 2 cf + assertTrue(tabCFsMap.get(tab3).contains("cf1"));// contains "cf1" + assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3" // 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3"); // 3.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); - assertTrue(tabCFsMap.containsKey("tab1")); - assertTrue(tabCFsMap.containsKey("tab2")); - assertTrue(tabCFsMap.containsKey("tab3")); + assertTrue(tabCFsMap.containsKey(tab1)); + assertTrue(tabCFsMap.containsKey(tab2)); + assertTrue(tabCFsMap.containsKey(tab3)); // 3.2 table "tab1" : null cf-list - assertEquals(null, tabCFsMap.get("tab1")); + assertEquals(null, tabCFsMap.get(tab1)); // 3.3 table "tab2" : cf-list contains a single cf "cf1" - assertEquals(1, tabCFsMap.get("tab2").size()); - assertEquals("cf1", tabCFsMap.get("tab2").get(0)); + assertEquals(1, tabCFsMap.get(tab2).size()); + assertEquals("cf1", tabCFsMap.get(tab2).get(0)); // 3.4 table "tab3" : cf-list contains "cf1" and "cf3" - assertEquals(2, tabCFsMap.get("tab3").size()); - assertTrue(tabCFsMap.get("tab3").contains("cf1")); - assertTrue(tabCFsMap.get("tab3").contains("cf3")); + assertEquals(2, tabCFsMap.get(tab3).size()); + assertTrue(tabCFsMap.get(tab3).contains("cf1")); + assertTrue(tabCFsMap.get(tab3).contains("cf3")); // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated // still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3" - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig( + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig( "tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;"); // 4.1 contains 3 tables : "tab1", "tab2" and "tab3" assertEquals(3, tabCFsMap.size()); - assertTrue(tabCFsMap.containsKey("tab1")); - assertTrue(tabCFsMap.containsKey("tab2")); - assertTrue(tabCFsMap.containsKey("tab3")); + assertTrue(tabCFsMap.containsKey(tab1)); + assertTrue(tabCFsMap.containsKey(tab2)); + assertTrue(tabCFsMap.containsKey(tab3)); // 4.2 table "tab1" : null cf-list - assertEquals(null, tabCFsMap.get("tab1")); + assertEquals(null, tabCFsMap.get(tab1)); // 4.3 table "tab2" : cf-list contains a single cf "cf1" - assertEquals(1, tabCFsMap.get("tab2").size()); - assertEquals("cf1", tabCFsMap.get("tab2").get(0)); + assertEquals(1, tabCFsMap.get(tab2).size()); + assertEquals("cf1", tabCFsMap.get(tab2).get(0)); // 4.4 table "tab3" : cf-list contains "cf1" and "cf3" - assertEquals(2, tabCFsMap.get("tab3").size()); - assertTrue(tabCFsMap.get("tab3").contains("cf1")); - assertTrue(tabCFsMap.get("tab3").contains("cf3")); + assertEquals(2, tabCFsMap.get(tab3).size()); + assertTrue(tabCFsMap.get(tab3).contains("cf1")); + assertTrue(tabCFsMap.get(tab3).contains("cf3")); // 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3" // "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally - tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig( + tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig( "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"); // 5.1 no "tab1" and "tab2", only "tab3" assertEquals(1, tabCFsMap.size()); // only one table - assertFalse(tabCFsMap.containsKey("tab1")); - assertFalse(tabCFsMap.containsKey("tab2")); - assertTrue(tabCFsMap.containsKey("tab3")); + assertFalse(tabCFsMap.containsKey(tab1)); + assertFalse(tabCFsMap.containsKey(tab2)); + assertTrue(tabCFsMap.containsKey(tab3)); // 5.2 table "tab3" : cf-list contains "cf1" and "cf3" - assertEquals(2, tabCFsMap.get("tab3").size()); - assertTrue(tabCFsMap.get("tab3").contains("cf1")); - assertTrue(tabCFsMap.get("tab3").contains("cf3")); + assertEquals(2, tabCFsMap.get(tab3).size()); + assertTrue(tabCFsMap.get(tab3).contains("cf1")); + assertTrue(tabCFsMap.get(tab3).contains("cf3")); } @Test(timeout=300000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 41a4c1469db..dfe043fab0b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -212,31 +212,31 @@ public class TestReplicationWALEntryFilters { // empty map userEntry = createEntry(a, b, c); - Map> tableCfs = new HashMap>(); + Map> tableCfs = new HashMap>(); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); assertEquals(null, filter.filter(userEntry)); // table bar userEntry = createEntry(a, b, c); - tableCfs = new HashMap>(); - tableCfs.put("bar", null); + tableCfs = new HashMap>(); + tableCfs.put(TableName.valueOf("bar"), null); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); assertEquals(null, filter.filter(userEntry)); // table foo:a userEntry = createEntry(a, b, c); - tableCfs = new HashMap>(); - tableCfs.put("foo", Lists.newArrayList("a")); + tableCfs = new HashMap>(); + tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a")); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); assertEquals(createEntry(a), filter.filter(userEntry)); // table foo:a,c userEntry = createEntry(a, b, c, d); - tableCfs = new HashMap>(); - tableCfs.put("foo", Lists.newArrayList("a", "c")); + tableCfs = new HashMap>(); + tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c")); when(peer.getTableCFs()).thenReturn(tableCfs); filter = new TableCfWALEntryFilter(peer); assertEquals(createEntry(a,c), filter.filter(userEntry)); @@ -273,6 +273,4 @@ public class TestReplicationWALEntryFilters { KeyValue.COMPARATOR.compare(cells1.get(i), cells2.get(i)); } } - - } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index cc9e41f5b86..6dedb2effc7 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -86,5 +86,17 @@ module Hbase def set_peer_tableCFs(id, tableCFs) @replication_admin.setPeerTableCFs(id, tableCFs) end + + #---------------------------------------------------------------------------------------------- + # Append a tableCFs config for the specified peer + def append_peer_tableCFs(id, tableCFs) + @replication_admin.appendPeerTableCFs(id, tableCFs) + end + + #---------------------------------------------------------------------------------------------- + # Remove some tableCFs from the tableCFs config of the specified peer + def remove_peer_tableCFs(id, tableCFs) + @replication_admin.removePeerTableCFs(id, tableCFs) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 5b59254b458..ea6c04b877d 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -338,6 +338,8 @@ Shell.load_command_group( show_peer_tableCFs set_peer_tableCFs list_replicated_tables + append_peer_tableCFs + remove_peer_tableCFs ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb new file mode 100644 index 00000000000..3919b203170 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/append_peer_tableCFs.rb @@ -0,0 +1,41 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class AppendPeerTableCFs< Command + def help + return <<-EOF +Append a replicable table-cf config for the specified peer +Examples: + + # append a table / table-cf to be replicable for a peer + hbase> append_peer_tableCFs '2', "table4:cfA,cfB" + +EOF + end + + def command(id, table_cfs) + format_simple_command do + replication_admin.append_peer_tableCFs(id, table_cfs) + end + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb new file mode 100644 index 00000000000..5b15b529651 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/remove_peer_tableCFs.rb @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class RemovePeerTableCFs < Command + def help + return <<-EOF +Remove a table / table-cf from the table-cfs config for the specified peer +Examples: + + # Remove a table / table-cf from the replicable table-cfs for a peer + hbase> remove_peer_tableCFs '2', "table1" + hbase> remove_peer_tableCFs '2', "table1:cf1" + +EOF + end + + def command(id, table_cfs) + format_simple_command do + replication_admin.remove_peer_tableCFs(id, table_cfs) + end + end + end + end +end