HBASE-16447 Replication by namespaces config in peer (Guanghao Zhang)

This commit is contained in:
Enis Soztutar 2016-09-16 11:47:42 -07:00
parent 2cf8907db5
commit 1a1003a482
25 changed files with 900 additions and 51 deletions

View File

@ -189,6 +189,8 @@ public class ReplicationAdmin implements Closeable {
* @param peerConfig configuration for the replication slave cluster
*/
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
this.replicationPeers.registerPeer(id, peerConfig);
}
@ -202,8 +204,11 @@ public class ReplicationAdmin implements Closeable {
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
this.replicationPeers.updatePeerConfig(id, peerConfig);
}
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short name that identifies the cluster
@ -360,7 +365,6 @@ public class ReplicationAdmin implements Closeable {
}
} else {
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
}
}
setPeerTableCFs(id, preTableCfs);
@ -376,6 +380,8 @@ public class ReplicationAdmin implements Closeable {
*/
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException {
checkNamespacesAndTableCfsConfigConflict(
this.replicationPeers.getReplicationPeerConfig(id).getNamespaces(), tableCfs);
this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
}
@ -627,4 +633,34 @@ public class ReplicationAdmin implements Closeable {
}
return true;
}
/**
* Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster.
*
* 1. If you already have set a namespace in the peer config, then you can't set any table
* of this namespace to the peer config.
* 2. If you already have set a table in the peer config, then you can't set this table's
* namespace to the peer config.
*
* @param namespaces
* @param tableCfs
* @throws ReplicationException
*/
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
if (namespaces == null || namespaces.isEmpty()) {
return;
}
if (tableCfs == null || tableCfs.isEmpty()) {
return;
}
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
if (namespaces.contains(table.getNamespaceAsString())) {
throw new ReplicationException(
"Table-cfs config conflict with namespaces config in peer");
}
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client.replication;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
@ -34,10 +36,12 @@ import org.apache.hadoop.hbase.util.Strings;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.Set;
/**
* Helper for TableCFs Operations.
@ -50,6 +54,13 @@ public final class ReplicationSerDeHelper {
private ReplicationSerDeHelper() {}
public static String convertToString(Set<String> namespaces) {
if (namespaces == null) {
return null;
}
return StringUtils.join(namespaces, ';');
}
/** convert map to TableCFs Object */
public static ZooKeeperProtos.TableCF[] convert(
Map<TableName, ? extends Collection<String>> tableCfs) {
@ -262,11 +273,21 @@ public final class ReplicationSerDeHelper {
for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
}
Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap);
}
List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) {
Set<String> namespaces = new HashSet<String>();
for (ByteString namespace : namespacesList) {
namespaces.add(namespace.toStringUtf8());
}
peerConfig.setNamespaces(namespaces);
}
return peerConfig;
}
@ -292,12 +313,20 @@ public final class ReplicationSerDeHelper {
.setValue(entry.getValue())
.build());
}
ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
if (tableCFs != null) {
for (int i = 0; i < tableCFs.length; i++) {
builder.addTableCfs(tableCFs[i]);
}
}
Set<String> namespaces = peerConfig.getNamespaces();
if (namespaces != null) {
for (String namespace : namespaces) {
builder.addNamespaces(ByteString.copyFromUtf8(namespace));
}
}
return builder.build();
}
@ -311,5 +340,4 @@ public final class ReplicationSerDeHelper {
byte[] bytes = convert(peerConfig).toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -71,6 +72,12 @@ public interface ReplicationPeer {
*/
public Map<TableName, List<String>> getTableCFs();
/**
* Get replicable namespace set of this peer
* @return the replicable namespaces set
*/
public Set<String> getNamespaces();
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
}

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
@ -42,7 +43,7 @@ public class ReplicationPeerConfig {
private final Map<byte[], byte[]> peerData;
private final Map<String, String> configuration;
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private Set<String> namespaces = null;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@ -93,10 +94,22 @@ public class ReplicationPeerConfig {
return this;
}
public Set<String> getNamespaces() {
return this.namespaces;
}
public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
this.namespaces = namespaces;
return this;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
if (namespaces != null) {
builder.append("namespaces=").append(namespaces.toString()).append(",");
}
if (tableCFsMap != null) {
builder.append("tableCFs=").append(tableCFsMap.toString());
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -163,6 +164,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return this.tableCFs;
}
/**
* Get replicable namespace set of this peer
* @return the replicable namespaces set
*/
@Override
public Set<String> getNamespaces() {
return this.peerConfig.getNamespaces();
}
@Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){

View File

@ -343,7 +343,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
throws ReplicationException {
ReplicationPeer peer = getConnectedPeer(id);
if (peer == null){
throw new ReplicationException("Could not find peer Id " + id);
throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
}
ReplicationPeerConfig existingConfig = peer.getPeerConfig();
if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
@ -366,6 +366,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// or data that weren't explicitly changed
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
existingConfig.getPeerData().putAll(newConfig.getPeerData());
existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
existingConfig.setNamespaces(newConfig.getNamespaces());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),

View File

@ -122,7 +122,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// znode containing the state of recovering regions
public String recoveringRegionsZNode;
// znode containing namespace descriptors
public static String namespaceZNode = "namespace";
public String namespaceZNode = "namespace";
// znode of indicating master maintenance mode
public static String masterMaintZNode = "masterMaintenance";

View File

@ -4782,6 +4782,20 @@ public final class ZooKeeperProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
int index);
// repeated bytes namespaces = 6;
/**
* <code>repeated bytes namespaces = 6;</code>
*/
java.util.List<com.google.protobuf.ByteString> getNamespacesList();
/**
* <code>repeated bytes namespaces = 6;</code>
*/
int getNamespacesCount();
/**
* <code>repeated bytes namespaces = 6;</code>
*/
com.google.protobuf.ByteString getNamespaces(int index);
}
/**
* Protobuf type {@code hbase.pb.ReplicationPeer}
@ -4873,6 +4887,14 @@ public final class ZooKeeperProtos {
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry));
break;
}
case 50: {
if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
mutable_bitField0_ |= 0x00000020;
}
namespaces_.add(input.readBytes());
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -4890,6 +4912,9 @@ public final class ZooKeeperProtos {
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_);
}
if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -5131,12 +5156,36 @@ public final class ZooKeeperProtos {
return tableCfs_.get(index);
}
// repeated bytes namespaces = 6;
public static final int NAMESPACES_FIELD_NUMBER = 6;
private java.util.List<com.google.protobuf.ByteString> namespaces_;
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public java.util.List<com.google.protobuf.ByteString>
getNamespacesList() {
return namespaces_;
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public int getNamespacesCount() {
return namespaces_.size();
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public com.google.protobuf.ByteString getNamespaces(int index) {
return namespaces_.get(index);
}
private void initFields() {
clusterkey_ = "";
replicationEndpointImpl_ = "";
data_ = java.util.Collections.emptyList();
configuration_ = java.util.Collections.emptyList();
tableCfs_ = java.util.Collections.emptyList();
namespaces_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -5187,6 +5236,9 @@ public final class ZooKeeperProtos {
for (int i = 0; i < tableCfs_.size(); i++) {
output.writeMessage(5, tableCfs_.get(i));
}
for (int i = 0; i < namespaces_.size(); i++) {
output.writeBytes(6, namespaces_.get(i));
}
getUnknownFields().writeTo(output);
}
@ -5216,6 +5268,15 @@ public final class ZooKeeperProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, tableCfs_.get(i));
}
{
int dataSize = 0;
for (int i = 0; i < namespaces_.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream
.computeBytesSizeNoTag(namespaces_.get(i));
}
size += dataSize;
size += 1 * getNamespacesList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -5255,6 +5316,8 @@ public final class ZooKeeperProtos {
.equals(other.getConfigurationList());
result = result && getTableCfsList()
.equals(other.getTableCfsList());
result = result && getNamespacesList()
.equals(other.getNamespacesList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -5288,6 +5351,10 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
hash = (53 * hash) + getTableCfsList().hashCode();
}
if (getNamespacesCount() > 0) {
hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
hash = (53 * hash) + getNamespacesList().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -5427,6 +5494,8 @@ public final class ZooKeeperProtos {
} else {
tableCfsBuilder_.clear();
}
namespaces_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@ -5490,6 +5559,11 @@ public final class ZooKeeperProtos {
} else {
result.tableCfs_ = tableCfsBuilder_.build();
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
bitField0_ = (bitField0_ & ~0x00000020);
}
result.namespaces_ = namespaces_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -5594,6 +5668,16 @@ public final class ZooKeeperProtos {
}
}
}
if (!other.namespaces_.isEmpty()) {
if (namespaces_.isEmpty()) {
namespaces_ = other.namespaces_;
bitField0_ = (bitField0_ & ~0x00000020);
} else {
ensureNamespacesIsMutable();
namespaces_.addAll(other.namespaces_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -6541,6 +6625,78 @@ public final class ZooKeeperProtos {
return tableCfsBuilder_;
}
// repeated bytes namespaces = 6;
private java.util.List<com.google.protobuf.ByteString> namespaces_ = java.util.Collections.emptyList();
private void ensureNamespacesIsMutable() {
if (!((bitField0_ & 0x00000020) == 0x00000020)) {
namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>(namespaces_);
bitField0_ |= 0x00000020;
}
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public java.util.List<com.google.protobuf.ByteString>
getNamespacesList() {
return java.util.Collections.unmodifiableList(namespaces_);
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public int getNamespacesCount() {
return namespaces_.size();
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public com.google.protobuf.ByteString getNamespaces(int index) {
return namespaces_.get(index);
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public Builder setNamespaces(
int index, com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureNamespacesIsMutable();
namespaces_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public Builder addNamespaces(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureNamespacesIsMutable();
namespaces_.add(value);
onChanged();
return this;
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public Builder addAllNamespaces(
java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
ensureNamespacesIsMutable();
super.addAll(values, namespaces_);
onChanged();
return this;
}
/**
* <code>repeated bytes namespaces = 6;</code>
*/
public Builder clearNamespaces() {
namespaces_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000020);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
}
@ -9822,24 +9978,24 @@ public final class ZooKeeperProtos {
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
"bleName\022\020\n\010families\030\002 \003(\014\"\305\001\n\017Replicatio" +
"bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
"\030\005 \003(\0132\021.hbase.pb.TableCF\"g\n\020Replication" +
"State\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicat" +
"ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
"DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
"\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo",
"ck_owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_na" +
"me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
"ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
"d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
"\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
"te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
".hbase.protobuf.generatedB\017ZooKeeperProt" +
"osH\001\210\001\001\240\001\001"
"\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
"\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
"2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
"ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
"ationHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017R",
"eplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tT" +
"ableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb." +
"TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb" +
".ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sha" +
"red\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_tim" +
"e\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010B" +
"E\n*org.apache.hadoop.hbase.protobuf.gene" +
"ratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -9887,7 +10043,7 @@ public final class ZooKeeperProtos {
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicationPeer_descriptor,
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
internal_static_hbase_pb_ReplicationState_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new

View File

@ -121,6 +121,7 @@ message ReplicationPeer {
repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
}
/**

View File

@ -54,7 +54,7 @@ public class ZKNamespaceManager extends ZooKeeperListener {
public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException {
super(zkw);
nsZNode = ZooKeeperWatcher.namespaceZNode;
nsZNode = zkw.namespaceZNode;
cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>();
}

View File

@ -72,7 +72,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService
if (scopeFilter != null) {
filters.add(scopeFilter);
}
WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter();
if (tableCfFilter != null) {
filters.add(tableCfFilter);
}
@ -87,8 +87,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService
/** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
* return null if they don't want this filter */
protected WALEntryFilter getTableCfWALEntryFilter() {
return new TableCfWALEntryFilter(ctx.getReplicationPeer());
protected WALEntryFilter getNamespaceTableCfWALEntryFilter() {
return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer());
}
@Override

View File

@ -20,38 +20,63 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import com.google.common.base.Predicate;
public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
/**
* Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry
* by namespaces config, then filter entry by table-cfs config.
*
* 1. Set a namespace in peer config means that all tables in this namespace will be replicated.
* 2. If the namespaces config is null, then the table-cfs config decide which table's edit
* can be replicated. If the table-cfs config is null, then the namespaces config decide
* which table's edit can be replicated.
*/
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
private ReplicationPeer peer;
private static final Log LOG = LogFactory.getLog(NamespaceTableCfWALEntryFilter.class);
private final ReplicationPeer peer;
private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
public TableCfWALEntryFilter(ReplicationPeer peer) {
public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) {
this.peer = peer;
}
@Override
public Entry filter(Entry entry) {
TableName tabName = entry.getKey().getTablename();
String namespace = tabName.getNamespaceAsString();
Set<String> namespaces = this.peer.getNamespaces();
Map<TableName, List<String>> tableCFs = getTableCfs();
// If null means user has explicitly not configured any table CFs so all the tables data are
// applicable for replication
if (tableCFs == null) return entry;
// If null means user has explicitly not configured any namespaces and table CFs
// so all the tables data are applicable for replication
if (namespaces == null && tableCFs == null) {
return entry;
}
if (!tableCFs.containsKey(tabName)) {
// First filter by namespaces config
// If table's namespace in peer config, all the tables data are applicable for replication
if (namespaces != null && namespaces.contains(namespace)) {
return entry;
}
// Then filter by table-cfs config
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicaable namespace list and table list
if (tableCFs == null || !tableCFs.containsKey(tabName)) {
return null;
}

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.client.replication;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -385,4 +387,86 @@ public class TestReplicationAdmin {
admin.removePeer(ID_ONE);
}
@Test
public void testSetPeerNamespaces() throws Exception {
String ns1 = "ns1";
String ns2 = "ns2";
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addPeer(ID_ONE, rpc);
admin.peerAdded(ID_ONE);
rpc = admin.getPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc);
namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
assertEquals(2, namespaces.size());
assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2));
rpc = admin.getPeerConfig(ID_ONE);
namespaces.clear();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc);
namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
assertEquals(1, namespaces.size());
assertTrue(namespaces.contains(ns1));
admin.removePeer(ID_ONE);
}
@Test
public void testNamespacesAndTableCfsConfigConflict() throws ReplicationException {
String ns1 = "ns1";
String ns2 = "ns2";
TableName tab1 = TableName.valueOf("ns1:tabl");
TableName tab2 = TableName.valueOf("ns2:tab2");
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addPeer(ID_ONE, rpc);
admin.peerAdded(ID_ONE);
rpc = admin.getPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<String>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tab1, new ArrayList<String>());
rpc.setTableCFsMap(tableCfs);
try {
admin.updatePeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException, because table " + tab1 + " conflict with namespace "
+ ns1);
} catch (ReplicationException e) {
// OK
}
rpc = admin.getPeerConfig(ID_ONE);
tableCfs.clear();
tableCfs.put(tab2, new ArrayList<String>());
rpc.setTableCFsMap(tableCfs);
admin.updatePeerConfig(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
namespaces.clear();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
try {
admin.updatePeerConfig(ID_ONE, rpc);
fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table "
+ tab2);
} catch (ReplicationException e) {
// OK
}
admin.removePeer(ID_ONE);
}
}

View File

@ -0,0 +1,248 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MediumTests.class})
public class TestNamespaceReplication extends TestReplicationBase {
private static final Log LOG = LogFactory.getLog(TestNamespaceReplication.class);
private static String ns1 = "ns1";
private static String ns2 = "ns2";
private static final TableName tabAName = TableName.valueOf("ns1:TA");
private static final TableName tabBName = TableName.valueOf("ns2:TB");
private static final byte[] f1Name = Bytes.toBytes("f1");
private static final byte[] f2Name = Bytes.toBytes("f2");
private static final byte[] val = Bytes.toBytes("myval");
private static HTableDescriptor tabA;
private static HTableDescriptor tabB;
private static Connection connection1;
private static Connection connection2;
private static Admin admin1;
private static Admin admin2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
connection1 = ConnectionFactory.createConnection(conf1);
connection2 = ConnectionFactory.createConnection(conf2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();
admin1.createNamespace(NamespaceDescriptor.create(ns1).build());
admin1.createNamespace(NamespaceDescriptor.create(ns2).build());
admin2.createNamespace(NamespaceDescriptor.create(ns1).build());
admin2.createNamespace(NamespaceDescriptor.create(ns2).build());
tabA = new HTableDescriptor(tabAName);
HColumnDescriptor fam = new HColumnDescriptor(f1Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabA.addFamily(fam);
fam = new HColumnDescriptor(f2Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabA.addFamily(fam);
admin1.createTable(tabA);
admin2.createTable(tabA);
tabB = new HTableDescriptor(tabBName);
fam = new HColumnDescriptor(f1Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabB.addFamily(fam);
fam = new HColumnDescriptor(f2Name);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
tabB.addFamily(fam);
admin1.createTable(tabB);
admin2.createTable(tabB);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
admin1.disableTable(tabAName);
admin1.deleteTable(tabAName);
admin1.disableTable(tabBName);
admin1.deleteTable(tabBName);
admin2.disableTable(tabAName);
admin2.deleteTable(tabAName);
admin2.disableTable(tabBName);
admin2.deleteTable(tabBName);
admin1.deleteNamespace(ns1);
admin1.deleteNamespace(ns2);
admin2.deleteNamespace(ns1);
admin2.deleteNamespace(ns2);
connection1.close();
connection2.close();
TestReplicationBase.tearDownAfterClass();
}
@Test
public void testNamespaceReplication() throws Exception {
Table htab1A = connection1.getTable(tabAName);
Table htab2A = connection2.getTable(tabAName);
Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName);
admin.peerAdded("2");
// add ns1 to peer config which replicate to cluster2
ReplicationPeerConfig rpc = admin.getPeerConfig("2");
Set<String> namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updatePeerConfig("2", rpc);
LOG.info("update peer config");
// Table A can be replicated to cluster2
put(htab1A, row, f1Name, f2Name);
ensureRowExisted(htab2A, row, f1Name, f2Name);
delete(htab1A, row, f1Name, f2Name);
ensureRowNotExisted(htab2A, row, f1Name, f2Name);
// Table B can not be replicated to cluster2
put(htab1B, row, f1Name, f2Name);
ensureRowNotExisted(htab2B, row, f1Name, f2Name);
// add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2
rpc = admin.getPeerConfig("2");
namespaces = new HashSet<>();
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
Map<TableName, List<String>> tableCfs = new HashMap<>();
tableCfs.put(tabAName, new ArrayList<String>());
tableCfs.get(tabAName).add("f1");
rpc.setTableCFsMap(tableCfs);
admin.updatePeerConfig("2", rpc);
LOG.info("update peer config");
// Only family f1 of Table A can replicated to cluster2
put(htab1A, row, f1Name, f2Name);
ensureRowExisted(htab2A, row, f1Name);
delete(htab1A, row, f1Name, f2Name);
ensureRowNotExisted(htab2A, row, f1Name);
// All cfs of table B can replicated to cluster2
put(htab1B, row, f1Name, f2Name);
ensureRowExisted(htab2B, row, f1Name, f2Name);
delete(htab1B, row, f1Name, f2Name);
ensureRowNotExisted(htab2B, row, f1Name, f2Name);
admin.removePeer("2");
}
private void put(Table source, byte[] row, byte[]... families)
throws Exception {
for (byte[] fam : families) {
Put put = new Put(row);
put.addColumn(fam, row, val);
source.put(put);
}
}
private void delete(Table source, byte[] row, byte[]... families)
throws Exception {
for (byte[] fam : families) {
Delete del = new Delete(row);
del.addFamily(fam);
source.delete(del);
}
}
private void ensureRowExisted(Table target, byte[] row, byte[]... families)
throws Exception {
for (byte[] fam : families) {
Get get = new Get(row);
get.addFamily(fam);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for put replication");
}
Result res = target.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
} else {
assertEquals(res.size(), 1);
assertArrayEquals(res.value(), val);
break;
}
Thread.sleep(SLEEP_TIME);
}
}
}
private void ensureRowNotExisted(Table target, byte[] row, byte[]... families)
throws Exception {
for (byte[] fam : families) {
Get get = new Get(row);
get.addFamily(fam);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for delete replication");
}
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
} else {
break;
}
Thread.sleep(SLEEP_TIME);
}
}
}
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
@ -196,19 +198,22 @@ public class TestReplicationWALEntryFilters {
}
@Test
public void testTableCfWALEntryFilter() {
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
// 1. no namespaces config and table-cfs config in peer
when(peer.getNamespaces()).thenReturn(null);
when(peer.getTableCFs()).thenReturn(null);
Entry userEntry = createEntry(null, a, b, c);
WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
// 2. Only config table-cfs in peer
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table bar
@ -216,7 +221,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table foo:a
@ -224,7 +229,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
// table foo:a,c
@ -232,8 +237,64 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
// 3. Only config namespaces in peer
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<String>();
when(peer.getNamespaces()).thenReturn(namespaces);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// namespace default
namespaces.add("default");
when(peer.getNamespaces()).thenReturn(namespaces);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
// namespace ns1
namespaces = new HashSet<String>();;
namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// 4. Config namespaces and table-cfs both
// Namespaces config should not confict with table-cfs config
namespaces = new HashSet<String>();
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
namespaces = new HashSet<String>();;
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
namespaces = new HashSet<String>();;
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
}
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -280,7 +281,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
when(mockPeer.getNamespaces()).thenReturn(null);
when(mockPeer.getTableCFs()).thenReturn(null);
when(mockPeer.getPeerConfig()).thenReturn(new ReplicationPeerConfig());
when(context.getReplicationPeer()).thenReturn(mockPeer);
replicator.init(context);

View File

@ -62,6 +62,7 @@ module Hbase
config = args.fetch(CONFIG, nil)
data = args.fetch(DATA, nil)
table_cfs = args.fetch(TABLE_CFS, nil)
namespaces = args.fetch(NAMESPACES, nil)
# Create and populate a ReplicationPeerConfig
replication_peer_config = ReplicationPeerConfig.new
@ -83,6 +84,14 @@ module Hbase
}
end
unless namespaces.nil?
ns_set = java.util.HashSet.new
namespaces.each do |n|
ns_set.add(n)
end
replication_peer_config.set_namespaces(ns_set)
end
unless table_cfs.nil?
# convert table_cfs to TableName
map = java.util.HashMap.new
@ -180,12 +189,39 @@ module Hbase
end
@replication_admin.removePeerTableCFs(id, map)
end
# Set new namespaces config for the specified peer
def set_peer_namespaces(id, namespaces)
unless namespaces.nil?
ns_set = java.util.HashSet.new
namespaces.each do |n|
ns_set.add(n)
end
rpc = get_peer_config(id)
unless rpc.nil?
rpc.setNamespaces(ns_set)
@replication_admin.updatePeerConfig(id, rpc)
end
end
end
# Show the current namespaces config for the specified peer
def show_peer_namespaces(peer_config)
namespaces = peer_config.get_namespaces
if !namespaces.nil?
return namespaces.join(';')
else
return nil
end
end
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.enableTableRep(tableName)
end
#----------------------------------------------------------------------------------------------
# Disables a table's replication switch
def disable_tablerep(table_name)

View File

@ -78,6 +78,7 @@ module HBaseConstants
ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'
CLUSTER_KEY = 'CLUSTER_KEY'
TABLE_CFS = 'TABLE_CFS'
NAMESPACES = 'NAMESPACES'
CONFIG = 'CONFIG'
DATA = 'DATA'

View File

@ -370,6 +370,7 @@ Shell.load_command_group(
list_peers
enable_peer
disable_peer
set_peer_namespaces
show_peer_tableCFs
set_peer_tableCFs
list_replicated_tables

View File

@ -27,13 +27,25 @@ must be specified to identify the peer.
For a HBase cluster peer, a cluster key must be provided and is composed like this:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
This gives a full path for HBase to connect to another HBase cluster. An optional parameter for
table column families identifies which column families will be replicated to the peer cluster.
This gives a full path for HBase to connect to another HBase cluster.
An optional parameter for namespaces identifies which namespace's tables will be replicated
to the peer cluster.
An optional parameter for table column families identifies which tables and/or column families
will be replicated to the peer cluster.
Notice: Set a namespace in the peer config means that all tables in this namespace
will be replicated to the peer cluster. So if you already have set a namespace in peer config,
then you can't set this namespace's tables in the peer config again.
Examples:
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
NAMESPACES => ["ns1", "ns2", "ns3"]
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
NAMESPACES => ["ns1", "ns2"], TABLE_CFS => { "ns3:table1" => [], "ns3:table2" => ["cf1"] }
For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
are DATA and CONFIG which can be specified to set different either the peer_data or configuration

View File

@ -32,12 +32,15 @@ EOF
def command()
peers = replication_admin.list_peers
formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"])
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
"STATE", "NAMESPACES", "TABLE_CFS"])
peers.entrySet().each do |e|
state = replication_admin.get_peer_state(e.key)
namespaces = replication_admin.show_peer_namespaces(e.value)
tableCFs = replication_admin.show_peer_tableCFs(e.key)
formatter.row([ e.key, e.value, state, tableCFs ])
formatter.row([ e.key, e.value.getClusterKey,
e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
end
formatter.footer()

View File

@ -0,0 +1,51 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class SetPeerNamespaces< Command
def help
return <<-EOF
Set the replicable namespaces config for the specified peer.
Set a namespace in the peer config means that all tables in this
namespace will be replicated to the peer cluster. So if you already
have set a namespace in the peer config, then you can't set this
namespace's tables in the peer config again.
Examples:
# set namespaces config is null, then the table-cfs config decide
# which table to be replicated.
hbase> set_peer_namespaces '1', []
# set namespaces to be replicable for a peer.
# set a namespace in the peer config means that all tables in this
# namespace (with replication_scope != 0 ) will be replicated.
hbase> set_peer_namespaces '2', ["ns1", "ns2"]
EOF
end
def command(id, namespaces)
replication_admin.set_peer_namespaces(id, namespaces)
end
end
end
end

View File

@ -23,11 +23,15 @@ module Shell
class SetPeerTableCFs< Command
def help
return <<-EOF
Set the replicable table-cf config for the specified peer
Set the replicable table-cf config for the specified peer.
Can't set a table to table-cfs config if it's namespace already was in
namespaces config of this peer.
Examples:
# set all tables to be replicable for a peer
hbase> set_peer_tableCFs '1', ""
# set table-cfs config is null, then the namespaces config decide which
# table to be replicated.
hbase> set_peer_tableCFs '1'
# set table / table-cf to be replicable for a peer, for a table without
# an explicit column-family list, all replicable column-families (with

View File

@ -121,6 +121,49 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "add_peer: multiple zk cluster key and namespaces" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
namespaces = ["ns1", "ns2", "ns3"]
namespaces_str = "ns2;ns1;ns3"
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test "add_peer: multiple zk cluster key and namespaces, table_cfs" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
namespaces = ["ns1", "ns2"]
table_cfs = { "ns3:table1" => [], "ns3:table2" => ["cf1"],
"ns3:table3" => ["cf1", "cf2"] }
namespaces_str = "ns2;ns1"
table_cfs_str = "ns3.table1;ns3.table3:cf1,cf2;ns3.table2:cf1"
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
TABLE_CFS => table_cfs }
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id))
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
@ -152,6 +195,30 @@ module Hbase
end
end
define_test "set_peer_namespaces: works with namespaces array" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
namespaces = ["ns1", "ns2"]
namespaces_str = "ns2;ns1"
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
# but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
command(:set_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
end
define_test "get_peer_config: works with simple clusterKey peer" do
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }
@ -221,8 +288,8 @@ module Hbase
assert_equal("value2", peer_config.get_configuration.get("config2"))
assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
end
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
# define_test "add_peer: adding a second peer with same id should error" do