HBASE-11897 Add append and remove peer table-cfs cmds for replication (Liu Shaoqui)
This commit is contained in:
parent
612f4c1c21
commit
c0d4b26872
|
@ -22,10 +22,14 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -169,6 +174,55 @@ public class ReplicationAdmin implements Closeable {
|
|||
this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
|
||||
}
|
||||
|
||||
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
|
||||
if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<TableName, List<String>> tableCFsMap = null;
|
||||
// TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
|
||||
// parse out (table, cf-list) pairs from tableCFsConfig
|
||||
// format: "table1:cf1,cf2;table2:cfA,cfB"
|
||||
String[] tables = tableCFsConfig.split(";");
|
||||
for (String tab : tables) {
|
||||
// 1 ignore empty table config
|
||||
tab = tab.trim();
|
||||
if (tab.length() == 0) {
|
||||
continue;
|
||||
}
|
||||
// 2 split to "table" and "cf1,cf2"
|
||||
// for each table: "table:cf1,cf2" or "table"
|
||||
String[] pair = tab.split(":");
|
||||
String tabName = pair[0].trim();
|
||||
if (pair.length > 2 || tabName.length() == 0) {
|
||||
LOG.error("ignore invalid tableCFs setting: " + tab);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3 parse "cf1,cf2" part to List<cf>
|
||||
List<String> cfs = null;
|
||||
if (pair.length == 2) {
|
||||
String[] cfsList = pair[1].split(",");
|
||||
for (String cf : cfsList) {
|
||||
String cfName = cf.trim();
|
||||
if (cfName.length() > 0) {
|
||||
if (cfs == null) {
|
||||
cfs = new ArrayList<String>();
|
||||
}
|
||||
cfs.add(cfName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4 put <table, List<cf>> to map
|
||||
if (tableCFsMap == null) {
|
||||
tableCFsMap = new HashMap<TableName, List<String>>();
|
||||
}
|
||||
tableCFsMap.put(TableName.valueOf(tabName), cfs);
|
||||
}
|
||||
return tableCFsMap;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
|
||||
String tableCfsStr = null;
|
||||
|
@ -264,6 +318,111 @@ public class ReplicationAdmin implements Closeable {
|
|||
this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the replicable table-cf config of the specified peer
|
||||
* @param id a short that identifies the cluster
|
||||
* @param tableCfs table-cfs config str
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
|
||||
appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the replicable table-cf config of the specified peer
|
||||
* @param id a short that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException {
|
||||
if (tableCfs == null) {
|
||||
throw new ReplicationException("tableCfs is null");
|
||||
}
|
||||
Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
|
||||
if (preTableCfs == null) {
|
||||
setPeerTableCFs(id, tableCfs);
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
Collection<String> appendCfs = entry.getValue();
|
||||
if (preTableCfs.containsKey(table)) {
|
||||
List<String> cfs = preTableCfs.get(table);
|
||||
if (cfs == null || appendCfs == null) {
|
||||
preTableCfs.put(table, null);
|
||||
} else {
|
||||
Set<String> cfSet = new HashSet<String>(cfs);
|
||||
cfSet.addAll(appendCfs);
|
||||
preTableCfs.put(table, Lists.newArrayList(cfSet));
|
||||
}
|
||||
} else {
|
||||
if (appendCfs == null || appendCfs.isEmpty()) {
|
||||
preTableCfs.put(table, null);
|
||||
} else {
|
||||
preTableCfs.put(table, Lists.newArrayList(appendCfs));
|
||||
}
|
||||
}
|
||||
}
|
||||
setPeerTableCFs(id, preTableCfs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove some table-cfs from table-cfs config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCf table-cfs config str
|
||||
* @throws ReplicationException
|
||||
*/
|
||||
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
|
||||
removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove some table-cfs from config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws ReplicationException
|
||||
*/
|
||||
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException {
|
||||
if (tableCfs == null) {
|
||||
throw new ReplicationException("tableCfs is null");
|
||||
}
|
||||
|
||||
Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
|
||||
if (preTableCfs == null) {
|
||||
throw new ReplicationException("Table-Cfs for peer" + id + " is null");
|
||||
}
|
||||
for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
|
||||
TableName table = entry.getKey();
|
||||
Collection<String> removeCfs = entry.getValue();
|
||||
if (preTableCfs.containsKey(table)) {
|
||||
List<String> cfs = preTableCfs.get(table);
|
||||
if (cfs == null && removeCfs == null) {
|
||||
preTableCfs.remove(table);
|
||||
} else if (cfs != null && removeCfs != null) {
|
||||
Set<String> cfSet = new HashSet<String>(cfs);
|
||||
cfSet.removeAll(removeCfs);
|
||||
if (cfSet.isEmpty()) {
|
||||
preTableCfs.remove(table);
|
||||
} else {
|
||||
preTableCfs.put(table, Lists.newArrayList(cfSet));
|
||||
}
|
||||
} else if (cfs == null && removeCfs != null) {
|
||||
throw new ReplicationException("Cannot remove cf of table: " + table
|
||||
+ " which doesn't specify cfs from table-cfs config in peer: " + id);
|
||||
} else if (cfs != null && removeCfs == null) {
|
||||
throw new ReplicationException("Cannot remove table: " + table
|
||||
+ " which has specified cfs from table-cfs config in peer: " + id);
|
||||
}
|
||||
} else {
|
||||
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
|
||||
}
|
||||
}
|
||||
setPeerTableCFs(id, preTableCfs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the replicable table-cf config of the specified peer
|
||||
* @param id a short name that identifies the cluster
|
||||
|
|
|
@ -1484,8 +1484,8 @@ public final class ProtobufUtil {
|
|||
/**
|
||||
* Convert a protocol buffer DeleteType to delete KeyValue type.
|
||||
*
|
||||
* @param protocol buffer DeleteType
|
||||
* @return type
|
||||
* @param type The DeleteType
|
||||
* @return The type.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static KeyValue.Type fromDeleteType(
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.Map;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
||||
/**
|
||||
* ReplicationPeer manages enabled / disabled state for the peer.
|
||||
|
@ -67,6 +68,6 @@ public interface ReplicationPeer {
|
|||
* Get replicable (table, cf-list) map of this peer
|
||||
* @return the replicable (table, cf-list) map
|
||||
*/
|
||||
public Map<String, List<String>> getTableCFs();
|
||||
public Map<TableName, List<String>> getTableCFs();
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -30,6 +31,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
|
@ -49,7 +52,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
|
|||
private final ReplicationPeerConfig peerConfig;
|
||||
private final String id;
|
||||
private volatile PeerState peerState;
|
||||
private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
|
||||
private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
|
||||
private final Configuration conf;
|
||||
|
||||
private PeerStateTracker peerStateTracker;
|
||||
|
@ -110,59 +113,9 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
|
|||
this.readTableCFsZnode();
|
||||
}
|
||||
|
||||
static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
|
||||
if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<String, List<String>> tableCFsMap = null;
|
||||
// TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
|
||||
// parse out (table, cf-list) pairs from tableCFsConfig
|
||||
// format: "table1:cf1,cf2;table2:cfA,cfB"
|
||||
String[] tables = tableCFsConfig.split(";");
|
||||
for (String tab : tables) {
|
||||
// 1 ignore empty table config
|
||||
tab = tab.trim();
|
||||
if (tab.length() == 0) {
|
||||
continue;
|
||||
}
|
||||
// 2 split to "table" and "cf1,cf2"
|
||||
// for each table: "table:cf1,cf2" or "table"
|
||||
String[] pair = tab.split(":");
|
||||
String tabName = pair[0].trim();
|
||||
if (pair.length > 2 || tabName.length() == 0) {
|
||||
LOG.error("ignore invalid tableCFs setting: " + tab);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3 parse "cf1,cf2" part to List<cf>
|
||||
List<String> cfs = null;
|
||||
if (pair.length == 2) {
|
||||
String[] cfsList = pair[1].split(",");
|
||||
for (String cf : cfsList) {
|
||||
String cfName = cf.trim();
|
||||
if (cfName.length() > 0) {
|
||||
if (cfs == null) {
|
||||
cfs = new ArrayList<String>();
|
||||
}
|
||||
cfs.add(cfName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4 put <table, List<cf>> to map
|
||||
if (tableCFsMap == null) {
|
||||
tableCFsMap = new HashMap<String, List<String>>();
|
||||
}
|
||||
tableCFsMap.put(tabName, cfs);
|
||||
}
|
||||
|
||||
return tableCFsMap;
|
||||
}
|
||||
|
||||
private void readTableCFsZnode() {
|
||||
String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
|
||||
this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
|
||||
this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -202,7 +155,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
|
|||
* @return the replicable (table, cf-list) map
|
||||
*/
|
||||
@Override
|
||||
public Map<String, List<String>> getTableCFs() {
|
||||
public Map<TableName, List<String>> getTableCFs() {
|
||||
return this.tableCFs;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Set;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
|
@ -94,7 +95,7 @@ public interface ReplicationPeers {
|
|||
* @param peerId a short that identifies the cluster
|
||||
* @return the table and column-family list which will be replicated for this peer
|
||||
*/
|
||||
public Map<String, List<String>> getTableCFs(String peerId);
|
||||
public Map<TableName, List<String>> getTableCFs(String peerId);
|
||||
|
||||
/**
|
||||
* Returns the ReplicationPeer
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
|
@ -188,7 +190,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<String>> getTableCFs(String id) throws IllegalArgumentException {
|
||||
public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
|
||||
ReplicationPeer replicationPeer = this.peerClusters.get(id);
|
||||
if (replicationPeer == null) {
|
||||
throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -39,9 +40,9 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
|
|||
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
String tabName = entry.getKey().getTablename().getNameAsString();
|
||||
TableName tabName = entry.getKey().getTablename();
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
Map<String, List<String>> tableCFs = null;
|
||||
Map<TableName, List<String>> tableCFs = null;
|
||||
|
||||
try {
|
||||
tableCFs = this.peer.getTableCFs();
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -154,5 +155,58 @@ public class TestReplicationAdmin {
|
|||
assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testAppendPeerTableCFs() throws Exception {
|
||||
// Add a valid peer
|
||||
admin.addPeer(ID_ONE, KEY_ONE);
|
||||
|
||||
admin.appendPeerTableCFs(ID_ONE, "t1");
|
||||
assertEquals("t1", admin.getPeerTableCFs(ID_ONE));
|
||||
|
||||
// append table t2 to replication
|
||||
admin.appendPeerTableCFs(ID_ONE, "t2");
|
||||
assertEquals("t2;t1", admin.getPeerTableCFs(ID_ONE));
|
||||
|
||||
// append table column family: f1 of t3 to replication
|
||||
admin.appendPeerTableCFs(ID_ONE, "t3:f1");
|
||||
assertEquals("t3:f1;t2;t1", admin.getPeerTableCFs(ID_ONE));
|
||||
admin.removePeer(ID_ONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovePeerTableCFs() throws Exception {
|
||||
// Add a valid peer
|
||||
admin.addPeer(ID_ONE, KEY_ONE);
|
||||
try {
|
||||
admin.removePeerTableCFs(ID_ONE, "t3");
|
||||
assertTrue(false);
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
assertEquals("", admin.getPeerTableCFs(ID_ONE));
|
||||
|
||||
admin.setPeerTableCFs(ID_ONE, "t1;t2:cf1");
|
||||
try {
|
||||
admin.removePeerTableCFs(ID_ONE, "t3");
|
||||
assertTrue(false);
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
assertEquals("t1;t2:cf1", admin.getPeerTableCFs(ID_ONE));
|
||||
|
||||
try {
|
||||
admin.removePeerTableCFs(ID_ONE, "t1:f1");
|
||||
assertTrue(false);
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
admin.removePeerTableCFs(ID_ONE, "t1");
|
||||
assertEquals("t2:cf1", admin.getPeerTableCFs(ID_ONE));
|
||||
|
||||
try {
|
||||
admin.removePeerTableCFs(ID_ONE, "t2");
|
||||
assertTrue(false);
|
||||
} catch (ReplicationException e) {
|
||||
}
|
||||
admin.removePeerTableCFs(ID_ONE, "t2:cf1");
|
||||
assertEquals("", admin.getPeerTableCFs(ID_ONE));
|
||||
admin.removePeer(ID_ONE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,89 +173,93 @@ public class TestPerTableCFReplication {
|
|||
|
||||
@Test
|
||||
public void testParseTableCFsFromConfig() {
|
||||
Map<String, List<String>> tabCFsMap = null;
|
||||
Map<TableName, List<String>> tabCFsMap = null;
|
||||
|
||||
// 1. null or empty string, result should be null
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(null);
|
||||
assertEquals(null, tabCFsMap);
|
||||
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("");
|
||||
assertEquals(null, tabCFsMap);
|
||||
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" ");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(" ");
|
||||
assertEquals(null, tabCFsMap);
|
||||
|
||||
TableName tab1 = TableName.valueOf("tab1");
|
||||
TableName tab2 = TableName.valueOf("tab2");
|
||||
TableName tab3 = TableName.valueOf("tab3");
|
||||
|
||||
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1");
|
||||
assertEquals(1, tabCFsMap.size()); // only one table
|
||||
assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1"
|
||||
assertFalse(tabCFsMap.containsKey("tab2")); // not other table
|
||||
assertEquals(null, tabCFsMap.get("tab1")); // null cf-list,
|
||||
assertTrue(tabCFsMap.containsKey(tab1)); // its table name is "tab1"
|
||||
assertFalse(tabCFsMap.containsKey(tab2)); // not other table
|
||||
assertEquals(null, tabCFsMap.get(tab1)); // null cf-list,
|
||||
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab2:cf1");
|
||||
assertEquals(1, tabCFsMap.size()); // only one table
|
||||
assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2"
|
||||
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
|
||||
assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf
|
||||
assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
|
||||
assertTrue(tabCFsMap.containsKey(tab2)); // its table name is "tab2"
|
||||
assertFalse(tabCFsMap.containsKey(tab1)); // not other table
|
||||
assertEquals(1, tabCFsMap.get(tab2).size()); // cf-list contains only 1 cf
|
||||
assertEquals("cf1", tabCFsMap.get(tab2).get(0));// the only cf is "cf1"
|
||||
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab3 : cf1 , cf3");
|
||||
assertEquals(1, tabCFsMap.size()); // only one table
|
||||
assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2"
|
||||
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
|
||||
assertEquals(2, tabCFsMap.get("tab3").size()); // cf-list contains 2 cf
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf1"));// contains "cf1"
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
|
||||
assertTrue(tabCFsMap.containsKey(tab3)); // its table name is "tab2"
|
||||
assertFalse(tabCFsMap.containsKey(tab1)); // not other table
|
||||
assertEquals(2, tabCFsMap.get(tab3).size()); // cf-list contains 2 cf
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf1"));// contains "cf1"
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf3"));// contains "cf3"
|
||||
|
||||
// 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
|
||||
// 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
|
||||
assertEquals(3, tabCFsMap.size());
|
||||
assertTrue(tabCFsMap.containsKey("tab1"));
|
||||
assertTrue(tabCFsMap.containsKey("tab2"));
|
||||
assertTrue(tabCFsMap.containsKey("tab3"));
|
||||
assertTrue(tabCFsMap.containsKey(tab1));
|
||||
assertTrue(tabCFsMap.containsKey(tab2));
|
||||
assertTrue(tabCFsMap.containsKey(tab3));
|
||||
// 3.2 table "tab1" : null cf-list
|
||||
assertEquals(null, tabCFsMap.get("tab1"));
|
||||
assertEquals(null, tabCFsMap.get(tab1));
|
||||
// 3.3 table "tab2" : cf-list contains a single cf "cf1"
|
||||
assertEquals(1, tabCFsMap.get("tab2").size());
|
||||
assertEquals("cf1", tabCFsMap.get("tab2").get(0));
|
||||
assertEquals(1, tabCFsMap.get(tab2).size());
|
||||
assertEquals("cf1", tabCFsMap.get(tab2).get(0));
|
||||
// 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
|
||||
assertEquals(2, tabCFsMap.get("tab3").size());
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf1"));
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf3"));
|
||||
assertEquals(2, tabCFsMap.get(tab3).size());
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf1"));
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf3"));
|
||||
|
||||
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
|
||||
// still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
|
||||
"tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
|
||||
// 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
|
||||
assertEquals(3, tabCFsMap.size());
|
||||
assertTrue(tabCFsMap.containsKey("tab1"));
|
||||
assertTrue(tabCFsMap.containsKey("tab2"));
|
||||
assertTrue(tabCFsMap.containsKey("tab3"));
|
||||
assertTrue(tabCFsMap.containsKey(tab1));
|
||||
assertTrue(tabCFsMap.containsKey(tab2));
|
||||
assertTrue(tabCFsMap.containsKey(tab3));
|
||||
// 4.2 table "tab1" : null cf-list
|
||||
assertEquals(null, tabCFsMap.get("tab1"));
|
||||
assertEquals(null, tabCFsMap.get(tab1));
|
||||
// 4.3 table "tab2" : cf-list contains a single cf "cf1"
|
||||
assertEquals(1, tabCFsMap.get("tab2").size());
|
||||
assertEquals("cf1", tabCFsMap.get("tab2").get(0));
|
||||
assertEquals(1, tabCFsMap.get(tab2).size());
|
||||
assertEquals("cf1", tabCFsMap.get(tab2).get(0));
|
||||
// 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
|
||||
assertEquals(2, tabCFsMap.get("tab3").size());
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf1"));
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf3"));
|
||||
assertEquals(2, tabCFsMap.get(tab3).size());
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf1"));
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf3"));
|
||||
|
||||
// 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
|
||||
// "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
|
||||
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
|
||||
tabCFsMap = ReplicationAdmin.parseTableCFsFromConfig(
|
||||
"tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
|
||||
// 5.1 no "tab1" and "tab2", only "tab3"
|
||||
assertEquals(1, tabCFsMap.size()); // only one table
|
||||
assertFalse(tabCFsMap.containsKey("tab1"));
|
||||
assertFalse(tabCFsMap.containsKey("tab2"));
|
||||
assertTrue(tabCFsMap.containsKey("tab3"));
|
||||
assertFalse(tabCFsMap.containsKey(tab1));
|
||||
assertFalse(tabCFsMap.containsKey(tab2));
|
||||
assertTrue(tabCFsMap.containsKey(tab3));
|
||||
// 5.2 table "tab3" : cf-list contains "cf1" and "cf3"
|
||||
assertEquals(2, tabCFsMap.get("tab3").size());
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf1"));
|
||||
assertTrue(tabCFsMap.get("tab3").contains("cf3"));
|
||||
assertEquals(2, tabCFsMap.get(tab3).size());
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf1"));
|
||||
assertTrue(tabCFsMap.get(tab3).contains("cf3"));
|
||||
}
|
||||
|
||||
@Test(timeout=300000)
|
||||
|
|
|
@ -212,31 +212,31 @@ public class TestReplicationWALEntryFilters {
|
|||
|
||||
// empty map
|
||||
userEntry = createEntry(a, b, c);
|
||||
Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
|
||||
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
// table bar
|
||||
userEntry = createEntry(a, b, c);
|
||||
tableCfs = new HashMap<String, List<String>>();
|
||||
tableCfs.put("bar", null);
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("bar"), null);
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
assertEquals(null, filter.filter(userEntry));
|
||||
|
||||
// table foo:a
|
||||
userEntry = createEntry(a, b, c);
|
||||
tableCfs = new HashMap<String, List<String>>();
|
||||
tableCfs.put("foo", Lists.newArrayList("a"));
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
assertEquals(createEntry(a), filter.filter(userEntry));
|
||||
|
||||
// table foo:a,c
|
||||
userEntry = createEntry(a, b, c, d);
|
||||
tableCfs = new HashMap<String, List<String>>();
|
||||
tableCfs.put("foo", Lists.newArrayList("a", "c"));
|
||||
tableCfs = new HashMap<TableName, List<String>>();
|
||||
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
|
||||
when(peer.getTableCFs()).thenReturn(tableCfs);
|
||||
filter = new TableCfWALEntryFilter(peer);
|
||||
assertEquals(createEntry(a,c), filter.filter(userEntry));
|
||||
|
@ -273,6 +273,4 @@ public class TestReplicationWALEntryFilters {
|
|||
KeyValue.COMPARATOR.compare(cells1.get(i), cells2.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -86,5 +86,17 @@ module Hbase
|
|||
def set_peer_tableCFs(id, tableCFs)
|
||||
@replication_admin.setPeerTableCFs(id, tableCFs)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Append a tableCFs config for the specified peer
|
||||
def append_peer_tableCFs(id, tableCFs)
|
||||
@replication_admin.appendPeerTableCFs(id, tableCFs)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Remove some tableCFs from the tableCFs config of the specified peer
|
||||
def remove_peer_tableCFs(id, tableCFs)
|
||||
@replication_admin.removePeerTableCFs(id, tableCFs)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -338,6 +338,8 @@ Shell.load_command_group(
|
|||
show_peer_tableCFs
|
||||
set_peer_tableCFs
|
||||
list_replicated_tables
|
||||
append_peer_tableCFs
|
||||
remove_peer_tableCFs
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class AppendPeerTableCFs< Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Append a replicable table-cf config for the specified peer
|
||||
Examples:
|
||||
|
||||
# append a table / table-cf to be replicable for a peer
|
||||
hbase> append_peer_tableCFs '2', "table4:cfA,cfB"
|
||||
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(id, table_cfs)
|
||||
format_simple_command do
|
||||
replication_admin.append_peer_tableCFs(id, table_cfs)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class RemovePeerTableCFs < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Remove a table / table-cf from the table-cfs config for the specified peer
|
||||
Examples:
|
||||
|
||||
# Remove a table / table-cf from the replicable table-cfs for a peer
|
||||
hbase> remove_peer_tableCFs '2', "table1"
|
||||
hbase> remove_peer_tableCFs '2', "table1:cf1"
|
||||
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(id, table_cfs)
|
||||
format_simple_command do
|
||||
replication_admin.remove_peer_tableCFs(id, table_cfs)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue