HBASE-11367 Pluggable replication endpoint

This commit is contained in:
Enis Soztutar 2014-07-14 16:21:55 -07:00
parent 22f205b09b
commit c76b528b6d
37 changed files with 3471 additions and 887 deletions

View File

@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.client.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -34,14 +36,18 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import com.google.common.annotations.VisibleForTesting;
/**
* <p>
* This class provides the administrative interface to HBase cluster
@ -80,6 +86,8 @@ public class ReplicationAdmin implements Closeable {
.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
private final HConnection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
// be moved to hbase-server. Resolve it in HBASE-11392.
private final ReplicationQueuesClient replicationQueuesClient;
private final ReplicationPeers replicationPeers;
@ -126,27 +134,65 @@ public class ReplicationAdmin implements Closeable {
});
}
/**
* Add a new peer cluster to replicate to.
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's
* <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
* @throws IllegalStateException if there's already one slave since
* multi-slave isn't supported yet.
* @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
*/
@Deprecated
public void addPeer(String id, String clusterKey) throws ReplicationException {
this.replicationPeers.addPeer(id, clusterKey);
this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
}
@Deprecated
public void addPeer(String id, String clusterKey, String tableCFs)
throws ReplicationException {
this.replicationPeers.addPeer(id, clusterKey, tableCFs);
this.replicationPeers.addPeer(id,
new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
}
/**
* Add a new remote slave cluster for replication.
* @param id a short name that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
* @param tableCfs the table and column-family list which will be replicated for this peer.
* A map from tableName to column family names. An empty collection can be passed
* to indicate replicating all column families. Pass null for replicating all table and column
* families
*/
public void addPeer(String id, ReplicationPeerConfig peerConfig,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
}
@VisibleForTesting
static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
String tableCfsStr = null;
if (tableCfs != null) {
// Format: table1:cf1,cf2;table2:cfA,cfB;table3
StringBuilder builder = new StringBuilder();
for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
if (builder.length() > 0) {
builder.append(";");
}
builder.append(entry.getKey());
if (entry.getValue() != null && !entry.getValue().isEmpty()) {
builder.append(":");
builder.append(StringUtils.join(entry.getValue(), ","));
}
}
tableCfsStr = builder.toString();
}
return tableCfsStr;
}
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
*/
public void removePeer(String id) throws ReplicationException {
this.replicationPeers.removePeer(id);
@ -154,7 +200,7 @@ public class ReplicationAdmin implements Closeable {
/**
* Restart the replication stream to the specified peer.
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
*/
public void enablePeer(String id) throws ReplicationException {
this.replicationPeers.enablePeer(id);
@ -162,7 +208,7 @@ public class ReplicationAdmin implements Closeable {
/**
* Stop the replication stream to the specified peer.
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
*/
public void disablePeer(String id) throws ReplicationException {
this.replicationPeers.disablePeer(id);
@ -179,14 +225,30 @@ public class ReplicationAdmin implements Closeable {
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
* @deprecated use {@link #listPeerConfigs()}
*/
@Deprecated
public Map<String, String> listPeers() {
return this.replicationPeers.getAllPeerClusterKeys();
Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
Map<String, String> ret = new HashMap<String, String>(peers.size());
for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
ret.put(entry.getKey(), entry.getValue().getClusterKey());
}
return ret;
}
public Map<String, ReplicationPeerConfig> listPeerConfigs() {
return this.replicationPeers.getAllPeerConfigs();
}
public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
return this.replicationPeers.getReplicationPeerConfig(id);
}
/**
* Get the replicable table-cf config of the specified peer.
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
*/
public String getPeerTableCFs(String id) throws ReplicationException {
return this.replicationPeers.getPeerTableCFsConfig(id);
@ -194,16 +256,31 @@ public class ReplicationAdmin implements Closeable {
/**
* Set the replicable table-cf config of the specified peer
* @param id a short that identifies the cluster
* @param id a short name that identifies the cluster
* @deprecated use {@link #setPeerTableCFs(String, Map)}
*/
@Deprecated
public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
}
/**
* Set the replicable table-cf config of the specified peer
* @param id a short name that identifies the cluster
* @param tableCfs the table and column-family list which will be replicated for this peer.
* A map from tableName to column family names. An empty collection can be passed
* to indicate replicating all column families. Pass null for replicating all table and column
* families
*/
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException {
this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
}
/**
* Get the state of the specified peer cluster
* @param id String format of the Short that identifies the peer, an IllegalArgumentException
* is thrown if it doesn't exist
* @param id String format of the Short name that identifies the peer,
* an IllegalArgumentException is thrown if it doesn't exist
* @return true if replication is enabled to that peer, false if it isn't
*/
public boolean getPeerState(String id) throws ReplicationException {
@ -217,7 +294,7 @@ public class ReplicationAdmin implements Closeable {
}
}
/**
* Find all column families that are replicated from this cluster
* @return the full list of the replicated column families of this cluster as:
@ -227,7 +304,7 @@ public class ReplicationAdmin implements Closeable {
* types may be extended here. For example
* 1) the replication may only apply to selected peers instead of all peers
* 2) the replicationType may indicate the host Cluster servers as Slave
* for the table:columnFam.
* for the table:columnFam.
*/
public List<HashMap<String, String>> listReplicated() throws IOException {
List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
@ -249,5 +326,5 @@ public class ReplicationAdmin implements Closeable {
}
return replicationColFams;
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,362 +17,56 @@
*/
package org.apache.hadoop.hbase.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
/**
* This class acts as a wrapper for all the objects used to identify and
* communicate with remote peers and is responsible for answering to expired
* sessions and re-establishing the ZK connections.
* ReplicationPeer manages enabled / disabled state for the peer.
*/
@InterfaceAudience.Private
public class ReplicationPeer implements Abortable, Closeable {
private static final Log LOG = LogFactory.getLog(ReplicationPeer.class);
private final String clusterKey;
private final String id;
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
private final AtomicBoolean peerEnabled = new AtomicBoolean();
private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
// Cannot be final since a new object needs to be recreated when session fails
private ZooKeeperWatcher zkw;
private final Configuration conf;
private long lastRegionserverUpdate;
private PeerStateTracker peerStateTracker;
private TableCFsTracker tableCFsTracker;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface ReplicationPeer {
/**
* Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses.
* @param conf configuration object to this peer
* @param key cluster key used to locate the peer
* @param id string representation of this peer's identifier
* State of the peer, whether it is enabled or not
*/
public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
this.conf = conf;
this.clusterKey = key;
this.id = id;
try {
this.reloadZkWatcher();
} catch (IOException e) {
throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
}
}
/**
* start a state tracker to check whether this peer is enabled or not
*
* @param zookeeper zk watcher for the local cluster
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException
*/
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException {
ensurePeerEnabled(zookeeper, peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try {
this.readPeerStateZnode();
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
}
}
private void readPeerStateZnode() throws DeserializationException {
this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
}
/**
* start a table-cfs tracker to listen the (table, cf-list) map change
*
* @param zookeeper zk watcher for the local cluster
* @param tableCFsNode path to zk node which stores table-cfs
* @throws KeeperException
*/
public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
throws KeeperException {
this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
this);
this.tableCFsTracker.start();
this.readTableCFsZnode();
}
static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
return null;
}
Map<String, List<String>> tableCFsMap = null;
// parse out (table, cf-list) pairs from tableCFsConfig
// format: "table1:cf1,cf2;table2:cfA,cfB"
String[] tables = tableCFsConfig.split(";");
for (String tab : tables) {
// 1 ignore empty table config
tab = tab.trim();
if (tab.length() == 0) {
continue;
}
// 2 split to "table" and "cf1,cf2"
// for each table: "table:cf1,cf2" or "table"
String[] pair = tab.split(":");
String tabName = pair[0].trim();
if (pair.length > 2 || tabName.length() == 0) {
LOG.error("ignore invalid tableCFs setting: " + tab);
continue;
}
// 3 parse "cf1,cf2" part to List<cf>
List<String> cfs = null;
if (pair.length == 2) {
String[] cfsList = pair[1].split(",");
for (String cf : cfsList) {
String cfName = cf.trim();
if (cfName.length() > 0) {
if (cfs == null) {
cfs = new ArrayList<String>();
}
cfs.add(cfName);
}
}
}
// 4 put <table, List<cf>> to map
if (tableCFsMap == null) {
tableCFsMap = new HashMap<String, List<String>>();
}
tableCFsMap.put(tabName, cfs);
}
return tableCFsMap;
}
private void readTableCFsZnode() {
String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
}
/**
* Get the cluster key of that peer
* @return string consisting of zk ensemble addresses, client port
* and root znode
*/
public String getClusterKey() {
return clusterKey;
}
/**
* Get the state of this peer
* @return atomic boolean that holds the status
*/
public AtomicBoolean getPeerEnabled() {
return peerEnabled;
}
/**
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
public Map<String, List<String>> getTableCFs() {
return this.tableCFs;
}
/**
* Get a list of all the addresses of all the region servers
* for this peer cluster
* @return list of addresses
*/
public List<ServerName> getRegionServers() {
return regionServers;
}
/**
* Set the list of region servers for that peer
* @param regionServers list of addresses for the region servers
*/
public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
lastRegionserverUpdate = System.currentTimeMillis();
}
/**
* Get the ZK connection to this peer
* @return zk connection
*/
public ZooKeeperWatcher getZkw() {
return zkw;
}
/**
* Get the timestamp at which the last change occurred to the list of region servers to replicate
* to.
* @return The System.currentTimeMillis at the last time the list of peer region servers changed.
*/
public long getLastRegionserverUpdate() {
return lastRegionserverUpdate;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
enum PeerState {
ENABLED,
DISABLED
}
/**
* Get the identifier of this peer
* @return string representation of the id (short)
* @return string representation of the id
*/
public String getId() {
return id;
}
String getId();
/**
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
*/
public ReplicationPeerConfig getPeerConfig();
/**
* Returns the state of the peer
* @return the enabled state
*/
PeerState getPeerState();
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
public Configuration getConfiguration() {
return conf;
}
@Override
public void abort(String why, Throwable e) {
LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey
+ " was aborted for the following reason(s):" + why, e);
}
public Configuration getConfiguration();
/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
public void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
zkw = new ZooKeeperWatcher(conf,
"connection to cluster: " + id, this);
}
public Map<String, List<String>> getTableCFs();
@Override
public boolean isAborted() {
// Currently the replication peer is never "Aborted", we just log when the
// abort method is called.
return false;
}
@Override
public void close() throws IOException {
if (zkw != null){
zkw.close();
}
}
/**
* Parse the raw data from ZK to get a peer's state
* @param bytes raw ZK data
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
* @throws DeserializationException
*/
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder =
ZooKeeperProtos.ReplicationState.newBuilder();
ZooKeeperProtos.ReplicationState state;
try {
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
return state.getState();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
* @param zookeeper
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZooKeeperNodeTracker {
public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, peerStateZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
try {
readPeerStateZnode();
} catch (DeserializationException e) {
LOG.warn("Failed deserializing the content of " + path, e);
}
}
}
}
/**
* Tracker for (table, cf-list) map of this peer
*/
public class TableCFsTracker extends ZooKeeperNodeTracker {
public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, tableCFsZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
readTableCFsZnode();
}
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A configuration for the replication peer cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ReplicationPeerConfig {
private String clusterKey;
private String replicationEndpointImpl;
private final Map<byte[], byte[]> peerData;
private final Map<String, String> configuration;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
this.configuration = new HashMap<String, String>(0);
}
/**
* Set the clusterKey which is the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
*/
public ReplicationPeerConfig setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
return this;
}
/**
* Sets the ReplicationEndpoint plugin class for this peer.
* @param replicationEndpointImpl a class implementing ReplicationEndpoint
*/
public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
this.replicationEndpointImpl = replicationEndpointImpl;
return this;
}
public String getClusterKey() {
return clusterKey;
}
public String getReplicationEndpointImpl() {
return replicationEndpointImpl;
}
public Map<byte[], byte[]> getPeerData() {
return peerData;
}
public Map<String, String> getConfiguration() {
return configuration;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
return builder.toString();
}
}

View File

@ -0,0 +1,320 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import com.google.protobuf.InvalidProtocolBufferException;
@InterfaceAudience.Private
public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
private final ReplicationPeerConfig peerConfig;
private final String id;
private volatile PeerState peerState;
private volatile Map<String, List<String>> tableCFs = new HashMap<String, List<String>>();
private final Configuration conf;
private PeerStateTracker peerStateTracker;
private TableCFsTracker tableCFsTracker;
/**
* Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses.
* @param conf configuration object to this peer
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
this.conf = conf;
this.peerConfig = peerConfig;
this.id = id;
}
/**
* start a state tracker to check whether this peer is enabled or not
*
* @param zookeeper zk watcher for the local cluster
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException
*/
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException {
ensurePeerEnabled(zookeeper, peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try {
this.readPeerStateZnode();
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
}
}
private void readPeerStateZnode() throws DeserializationException {
this.peerState =
isStateEnabled(this.peerStateTracker.getData(false))
? PeerState.ENABLED
: PeerState.DISABLED;
}
/**
* start a table-cfs tracker to listen the (table, cf-list) map change
*
* @param zookeeper zk watcher for the local cluster
* @param tableCFsNode path to zk node which stores table-cfs
* @throws KeeperException
*/
public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
throws KeeperException {
this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
this);
this.tableCFsTracker.start();
this.readTableCFsZnode();
}
static Map<String, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
return null;
}
Map<String, List<String>> tableCFsMap = null;
// TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
// parse out (table, cf-list) pairs from tableCFsConfig
// format: "table1:cf1,cf2;table2:cfA,cfB"
String[] tables = tableCFsConfig.split(";");
for (String tab : tables) {
// 1 ignore empty table config
tab = tab.trim();
if (tab.length() == 0) {
continue;
}
// 2 split to "table" and "cf1,cf2"
// for each table: "table:cf1,cf2" or "table"
String[] pair = tab.split(":");
String tabName = pair[0].trim();
if (pair.length > 2 || tabName.length() == 0) {
LOG.error("ignore invalid tableCFs setting: " + tab);
continue;
}
// 3 parse "cf1,cf2" part to List<cf>
List<String> cfs = null;
if (pair.length == 2) {
String[] cfsList = pair[1].split(",");
for (String cf : cfsList) {
String cfName = cf.trim();
if (cfName.length() > 0) {
if (cfs == null) {
cfs = new ArrayList<String>();
}
cfs.add(cfName);
}
}
}
// 4 put <table, List<cf>> to map
if (tableCFsMap == null) {
tableCFsMap = new HashMap<String, List<String>>();
}
tableCFsMap.put(tabName, cfs);
}
return tableCFsMap;
}
private void readTableCFsZnode() {
String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
this.tableCFs = parseTableCFsFromConfig(currentTableCFs);
}
@Override
public PeerState getPeerState() {
return peerState;
}
/**
* Get the identifier of this peer
* @return string representation of the id (short)
*/
@Override
public String getId() {
return id;
}
/**
* Get the peer config object
* @return the ReplicationPeerConfig for this peer
*/
@Override
public ReplicationPeerConfig getPeerConfig() {
return peerConfig;
}
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
@Override
public Configuration getConfiguration() {
return conf;
}
/**
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
@Override
public Map<String, List<String>> getTableCFs() {
return this.tableCFs;
}
@Override
public void abort(String why, Throwable e) {
LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
+ " was aborted for the following reason(s):" + why, e);
}
@Override
public boolean isAborted() {
// Currently the replication peer is never "Aborted", we just log when the
// abort method is called.
return false;
}
@Override
public void close() throws IOException {
// TODO: stop zkw?
}
/**
* Parse the raw data from ZK to get a peer's state
* @param bytes raw ZK data
* @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
* @throws DeserializationException
*/
public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
/**
* @param bytes Content of a state znode.
* @return State parsed from the passed bytes.
* @throws DeserializationException
*/
private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
throws DeserializationException {
ProtobufUtil.expectPBMagicPrefix(bytes);
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationState.Builder builder =
ZooKeeperProtos.ReplicationState.newBuilder();
ZooKeeperProtos.ReplicationState state;
try {
state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
return state.getState();
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
}
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
* @param zookeeper
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
return true;
}
return false;
}
/**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZooKeeperNodeTracker {
public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, peerStateZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
try {
readPeerStateZnode();
} catch (DeserializationException e) {
LOG.warn("Failed deserializing the content of " + path, e);
}
}
}
}
/**
* Tracker for (table, cf-list) map of this peer
*/
public class TableCFsTracker extends ZooKeeperNodeTracker {
public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, tableCFsZNode, abortable);
}
@Override
public synchronized void nodeDataChanged(String path) {
if (path.equals(node)) {
super.nodeDataChanged(path);
readTableCFsZnode();
}
}
}
}

View File

@ -21,11 +21,10 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Pair;
/**
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@ -44,22 +43,16 @@ public interface ReplicationPeers {
* Initialize the ReplicationPeers interface.
*/
void init() throws ReplicationException;
/**
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
*/
void addPeer(String peerId, String clusterKey) throws ReplicationException;
/**
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
* @param tableCFs the table and column-family list which will be replicated for this peer
* @param peerConfig configuration for the replication slave cluster
* @param tableCFs the table and column-family list which will be replicated for this peer or null
* for all table and column families
*/
void addPeer(String peerId, String clusterKey, String tableCFs) throws ReplicationException;
void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
throws ReplicationException;
/**
* Removes a remote slave cluster and stops the replication to it.
@ -67,6 +60,10 @@ public interface ReplicationPeers {
*/
void removePeer(String peerId) throws ReplicationException;
boolean peerAdded(String peerId) throws ReplicationException;
void peerRemoved(String peerId);
/**
* Restart the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
@ -99,6 +96,19 @@ public interface ReplicationPeers {
*/
public Map<String, List<String>> getTableCFs(String peerId);
/**
* Returns the ReplicationPeer
* @param peerId id for the peer
* @return ReplicationPeer object
*/
ReplicationPeer getPeer(String peerId);
/**
* Returns the set of peerIds defined
* @return a Set of Strings for peerIds
*/
public Set<String> getPeerIds();
/**
* Get the replication status for the specified connected remote slave cluster.
* The value might be read from cache, so it is recommended to
@ -107,7 +117,7 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @return true if replication is enabled, false otherwise.
*/
boolean getStatusOfConnectedPeer(String peerId);
boolean getStatusOfPeer(String peerId);
/**
* Get the replication status for the specified remote slave cluster, which doesn't
@ -119,17 +129,11 @@ public interface ReplicationPeers {
boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
/**
* Get a set of all connected remote slave clusters.
* @return set of peer ids
*/
Set<String> getConnectedPeers();
/**
* List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
* connected/disconnected).
* List the cluster replication configs of all remote slave clusters (whether they are
* enabled/disabled or connected/disconnected).
* @return A map of peer ids to peer cluster keys
*/
Map<String, String> getAllPeerClusterKeys();
Map<String, ReplicationPeerConfig> getAllPeerConfigs();
/**
* List the peer ids of all remote slave clusters (whether they are enabled/disabled or
@ -139,45 +143,16 @@ public interface ReplicationPeers {
List<String> getAllPeerIds();
/**
* Attempt to connect to a new remote slave cluster.
* @param peerId a short that identifies the cluster
* @return true if a new connection was made, false if no new connection was made.
* Returns the configured ReplicationPeerConfig for this peerId
* @param peerId a short name that identifies the cluster
* @return ReplicationPeerConfig for the peer
*/
boolean connectToPeer(String peerId) throws ReplicationException;
/**
* Disconnect from a remote slave cluster.
* @param peerId a short that identifies the cluster
*/
void disconnectFromPeer(String peerId);
/**
* Returns all region servers from given connected remote slave cluster.
* @param peerId a short that identifies the cluster
* @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
* cluster is unavailable or there are no region servers in the cluster.
*/
List<ServerName> getRegionServersOfConnectedPeer(String peerId);
/**
* Get the timestamp of the last change in composition of a given peer cluster.
* @param peerId identifier of the peer cluster for which the timestamp is requested
* @return the timestamp (in milliseconds) of the last change to the composition of
* the peer cluster
*/
long getTimestampOfLastChangeToPeer(String peerId);
/**
* Returns the UUID of the provided peer id.
* @param peerId the peer's ID that will be converted into a UUID
* @return a UUID or null if the peer cluster does not exist or is not connected.
*/
UUID getPeerUUID(String peerId);
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param peerId a short that identifies the cluster
* @return the configuration for the peer cluster, null if it was unable to get the configuration
*/
Configuration getPeerConf(String peerId) throws ReplicationException;
Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
}

View File

@ -19,33 +19,29 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@ -77,7 +73,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeer> peerClusters;
private Map<String, ReplicationPeerZKImpl> peerClusters;
private final String tableCFsNodeName;
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
@ -86,7 +82,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
Abortable abortable) {
super(zk, conf, abortable);
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
this.peerClusters = new HashMap<String, ReplicationPeer>();
this.peerClusters = new HashMap<String, ReplicationPeerZKImpl>();
}
@Override
@ -98,16 +94,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication peers", e);
}
connectExistingPeers();
addExistingPeers();
}
@Override
public void addPeer(String id, String clusterKey) throws ReplicationException {
addPeer(id, clusterKey, null);
}
@Override
public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException {
public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
throws ReplicationException {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add a peer with id=" + id
@ -115,7 +107,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
toByteArray(clusterKey));
toByteArray(peerConfig));
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer.
// The peer state data is set as "ENABLED" by default.
@ -128,7 +120,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
Bytes.toBytes(tableCFsStr));
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + id
+ ", clusterKey=" + clusterKey, e);
+ ", peerConfif=>" + peerConfig, e);
}
}
@ -202,11 +194,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public boolean getStatusOfConnectedPeer(String id) {
public boolean getStatusOfPeer(String id) {
if (!this.peerClusters.containsKey(id)) {
throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
}
return this.peerClusters.get(id).getPeerEnabled().get();
return this.peerClusters.get(id).getPeerState() == PeerState.ENABLED;
}
@Override
@ -217,7 +209,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
String peerStateZNode = getPeerStateNode(id);
try {
return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
throw new ReplicationException(e);
} catch (DeserializationException e) {
@ -232,140 +224,98 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public boolean connectToPeer(String peerId) throws ReplicationException {
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
}
ReplicationPeer peer = null;
try {
peer = getPeer(peerId);
} catch (Exception e) {
throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
}
if (peer == null) {
return false;
}
this.peerClusters.put(peerId, peer);
LOG.info("Added new peer cluster " + peer.getClusterKey());
return true;
}
@Override
public void disconnectFromPeer(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
rp.getZkw().close();
this.peerClusters.remove(peerId);
}
}
@Override
public Map<String, String> getAllPeerClusterKeys() {
Map<String, String> peers = new TreeMap<String, String>();
public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
String clusterKey = null;
try {
clusterKey = parsePeerFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
if (peerConfig == null) {
LOG.warn("Failed to get replication peer configuration of clusterid=" + id
+ " znode content, continuing.");
continue;
}
peers.put(id, clusterKey);
peers.put(id, peerConfig);
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
} catch (InterruptedException e) {
} catch (ReplicationException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
@Override
public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
if (this.peerClusters.size() == 0) {
return Collections.emptyList();
}
ReplicationPeer peer = this.peerClusters.get(peerId);
if (peer == null) {
return Collections.emptyList();
}
List<ServerName> addresses;
try {
addresses = fetchSlavesAddresses(peer.getZkw());
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch salves addresses failed.", ke);
}
reconnectPeer(ke, peer);
addresses = Collections.emptyList();
}
peer.setRegionServers(addresses);
return peer.getRegionServers();
public ReplicationPeer getPeer(String peerId) {
return peerClusters.get(peerId);
}
@Override
public UUID getPeerUUID(String peerId) {
ReplicationPeer peer = this.peerClusters.get(peerId);
if (peer == null) {
return null;
}
UUID peerUUID = null;
try {
peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
} catch (KeeperException ke) {
reconnectPeer(ke, peer);
}
return peerUUID;
public Set<String> getPeerIds() {
return peerClusters.keySet(); // this is not thread-safe
}
/**
* Returns a ReplicationPeerConfig from the znode or null for the given peerId.
*/
@Override
public Set<String> getConnectedPeers() {
return this.peerClusters.keySet();
}
@Override
public Configuration getPeerConf(String peerId) throws ReplicationException {
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
throws ReplicationException {
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte[] data = null;
try {
data = ZKUtil.getData(this.zookeeper, znode);
} catch (KeeperException e) {
throw new ReplicationException("Error getting configuration for peer with id="
+ peerId, e);
} catch (InterruptedException e) {
LOG.warn("Could not get configuration for peer because the thread " +
"was interrupted. peerId=" + peerId);
Thread.currentThread().interrupt();
return null;
} catch (KeeperException e) {
throw new ReplicationException("Error getting configuration for peer with id="
+ peerId, e);
}
if (data == null) {
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
return null;
}
String otherClusterKey = "";
try {
otherClusterKey = parsePeerFrom(data);
return parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
return null;
}
}
@Override
public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
throws ReplicationException {
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
if (peerConfig == null) {
return null;
}
Configuration otherConf = new Configuration(this.conf);
try {
ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
}
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;
}
return otherConf;
if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
}
return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
}
/**
@ -382,19 +332,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return ids;
}
@Override
public long getTimestampOfLastChangeToPeer(String peerId) {
if (!peerClusters.containsKey(peerId)) {
throw new IllegalArgumentException("Unknown peer id: " + peerId);
}
return peerClusters.get(peerId).getLastRegionserverUpdate();
}
/**
* A private method used during initialization. This method attempts to connect to all registered
* A private method used during initialization. This method attempts to add all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
*/
private void connectExistingPeers() throws ReplicationException {
private void addExistingPeers() throws ReplicationException {
List<String> znodes = null;
try {
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
@ -403,45 +345,49 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
createAndAddPeer(z);
}
}
}
/**
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
* @param peer
*/
private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
try {
peer.reloadZkWatcher();
peer.getZkw().registerListener(new PeerRegionServerListener(peer));
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
}
@Override
public boolean peerAdded(String peerId) throws ReplicationException {
return createAndAddPeer(peerId);
}
@Override
public void peerRemoved(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
this.peerClusters.remove(peerId);
}
}
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave is unavailable
* Attempt to connect to a new remote slave cluster.
* @param peerId a short that identifies the cluster
* @return true if a new connection was made, false if no new connection was made.
*/
private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
if (children == null) {
return Collections.emptyList();
public boolean createAndAddPeer(String peerId) throws ReplicationException {
if (peerClusters == null) {
return false;
}
List<ServerName> addresses = new ArrayList<ServerName>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
if (this.peerClusters.containsKey(peerId)) {
return false;
}
return addresses;
ReplicationPeerZKImpl peer = null;
try {
peer = createPeer(peerId);
} catch (Exception e) {
throw new ReplicationException("Error adding peer with id=" + peerId, e);
}
if (peer == null) {
return false;
}
this.peerClusters.put(peerId, peer);
LOG.info("Added new peer cluster " + peer.getPeerConfig().getClusterKey());
return true;
}
private String getTableCFsNode(String id) {
@ -485,18 +431,14 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return object representing the peer
* @throws ReplicationException
*/
private ReplicationPeer getPeer(String peerId) throws ReplicationException {
Configuration peerConf = getPeerConf(peerId);
if (peerConf == null) {
return null;
}
if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
LOG.debug("Not connecting to " + peerId + " because it's us");
private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
if (pair == null) {
return null;
}
Configuration peerConf = pair.getSecond();
ReplicationPeer peer =
new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
try {
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
} catch (KeeperException e) {
@ -511,7 +453,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
peerId, e);
}
peer.getZkw().registerListener(new PeerRegionServerListener(peer));
return peer;
}
@ -520,7 +461,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return ClusterKey parsed from the passed bytes.
* @throws DeserializationException
*/
private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.ReplicationPeer.Builder builder =
@ -531,58 +473,70 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
}
return peer.getClusterkey();
return convert(peer);
} else {
if (bytes.length > 0) {
return Bytes.toString(bytes);
return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
}
return "";
return new ReplicationPeerConfig().setClusterKey("");
}
}
private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
if (peer.hasClusterkey()) {
peerConfig.setClusterKey(peer.getClusterkey());
}
if (peer.hasReplicationEndpointImpl()) {
peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
}
for (BytesBytesPair pair : peer.getDataList()) {
peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
}
for (NameStringPair pair : peer.getConfigurationList()) {
peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
}
return peerConfig;
}
private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
if (peerConfig.getClusterKey() != null) {
builder.setClusterkey(peerConfig.getClusterKey());
}
if (peerConfig.getReplicationEndpointImpl() != null) {
builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
}
for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
builder.addData(BytesBytesPair.newBuilder()
.setFirst(ByteString.copyFrom(entry.getKey()))
.setSecond(ByteString.copyFrom(entry.getValue()))
.build());
}
for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
builder.addConfiguration(NameStringPair.newBuilder()
.setName(entry.getKey())
.setValue(entry.getValue())
.build());
}
return builder.build();
}
/**
* @param clusterKey
* @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
* @param peerConfig
* @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
* for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
* /hbase/replication/peers/PEER_ID
*/
private static byte[] toByteArray(final String clusterKey) {
byte[] bytes =
ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
.toByteArray();
private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
byte[] bytes = convert(peerConfig).toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
public static class PeerRegionServerListener extends ZooKeeperListener {
private ReplicationPeer peer;
private String regionServerListNode;
public PeerRegionServerListener(ReplicationPeer replicationPeer) {
super(replicationPeer.getZkw());
this.peer = replicationPeer;
this.regionServerListNode = peer.getZkw().rsZNode;
}
public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
super(zkw);
this.regionServerListNode = regionServerListNode;
}
@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer regionservers, fetching updated list");
peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
} catch (KeeperException e) {
LOG.fatal("Error reading slave addresses", e);
}
}
}
}
}

View File

@ -93,6 +93,7 @@ public class ZKClusterId {
* @throws KeeperException
*/
public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
return UUID.fromString(readClusterIdZNode(zkw));
String uuid = readClusterIdZNode(zkw);
return uuid == null ? null : UUID.fromString(uuid);
}
}

View File

@ -27,5 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public class HBaseInterfaceAudience {
public static final String COPROC = "Coprocesssor";
public static final String REPLICATION = "Replication";
public static final String PHOENIX = "Phoenix";
}

View File

@ -119,6 +119,9 @@ message ReplicationPeer {
// clusterkey is the concatenation of the slave cluster's
// hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
required string clusterkey = 1;
optional string replicationEndpointImpl = 2;
repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4;
}
/**

View File

@ -39,9 +39,11 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
@ -131,6 +133,7 @@ public class VerifyReplication extends Configured implements Tool {
}
}
@Override
protected void cleanup(Context context) {
if (replicatedScanner != null) {
replicatedScanner.close();
@ -141,7 +144,7 @@ public class VerifyReplication extends Configured implements Tool {
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
ZooKeeperWatcher localZKW = null;
ReplicationPeer peer = null;
ReplicationPeerZKImpl peer = null;
try {
localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
new Abortable() {
@ -152,11 +155,11 @@ public class VerifyReplication extends Configured implements Tool {
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
rp.init();
Configuration peerConf = rp.getPeerConf(peerId);
if (peerConf == null) {
Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}
Configuration peerConf = rp.getPeerConf(peerId).getSecond();
return ZKUtil.getZooKeeperClusterKey(peerConf);
} catch (ReplicationException e) {
throw new IOException(

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@ -133,6 +133,7 @@ public interface HLog {
*/
// TODO: Remove this Writable.
// TODO: Why is this in here? Implementation detail?
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Entry implements Writable {
private WALEdit edit;
private HLogKey key;
@ -224,7 +225,7 @@ public interface HLog {
* @return the number of HLog files
*/
int getNumLogFiles();
/**
* @return the size of HLog files
*/
@ -292,9 +293,10 @@ public interface HLog {
* @param sequenceId
* @throws IOException
* @deprecated For tests only and even then, should use
* {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
* {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean,
* List)} and {@link #sync()} instead.
*/
@Deprecated
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
@ -343,6 +345,7 @@ public interface HLog {
* instead because you can get back the region edit/sequenceid; it is set into the passed in
* <code>key</code>.
*/
@Deprecated
long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
boolean isInMemstore, long nonceGroup, long nonce) throws IOException;

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
@ -67,7 +68,7 @@ import com.google.protobuf.ByteString;
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into HLogEntry.
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
public static final Log LOG = LogFactory.getLog(HLogKey.class);
@ -190,7 +191,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce);
}
@ -208,7 +209,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
long nonceGroup, long nonce) {
init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTimeMillis(),
EMPTY_UUIDS, nonceGroup, nonce);
}
@ -254,7 +255,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
this.logSeqNum = sequence;
this.seqNumAssignedLatch.countDown();
}
/**
* Used to set original seq Id for HLogKey during wal replay
* @param seqId
@ -276,6 +277,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
* @return long the new assigned sequence number
* @throws InterruptedException
*/
@Override
public long getSequenceNumber() throws IOException {
try {
this.seqNumAssignedLatch.await();
@ -396,6 +398,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
return result;
}
@Override
public int compareTo(HLogKey o) {
int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
if (result == 0) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@ -74,7 +75,7 @@ import org.apache.hadoop.io.Writable;
* is an old style KeyValue or the new style WALEdit.
*
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class WALEdit implements Writable, HeapSize {
public static final Log LOG = LogFactory.getLog(WALEdit.class);
@ -288,4 +289,4 @@ public class WALEdit implements Writable, HeapSize {
}
return null;
}
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
/**
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
* class rather than implementing {@link ReplicationEndpoint} directly for better backwards
* compatibility.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public abstract class BaseReplicationEndpoint extends AbstractService
implements ReplicationEndpoint {
protected Context ctx;
@Override
public void init(Context context) throws IOException {
this.ctx = context;
}
/** Returns a default set of filters */
@Override
public WALEntryFilter getWALEntryfilter() {
ArrayList<WALEntryFilter> filters = Lists.newArrayList();
WALEntryFilter scopeFilter = getScopeWALEntryFilter();
if (scopeFilter != null) {
filters.add(scopeFilter);
}
WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
if (tableCfFilter != null) {
filters.add(tableCfFilter);
}
return filters.isEmpty() ? null : new ChainWALEntryFilter(filters);
}
/** Returns a WALEntryFilter for checking the scope. Subclasses can
* return null if they don't want this filter */
protected WALEntryFilter getScopeWALEntryFilter() {
return new ScopeWALEntryFilter();
}
/** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
* return null if they don't want this filter */
protected WALEntryFilter getTableCfWALEntryFilter() {
return new TableCfWALEntryFilter(ctx.getReplicationPeer());
}
@Override
public boolean canReplicateToSameCluster() {
return false;
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
/**
* A {@link WALEntryFilter} which contains multiple filters and applies them
* in chain order
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class ChainWALEntryFilter implements WALEntryFilter {
private final WALEntryFilter[] filters;
public ChainWALEntryFilter(WALEntryFilter...filters) {
this.filters = filters;
}
public ChainWALEntryFilter(List<WALEntryFilter> filters) {
ArrayList<WALEntryFilter> rawFilters = new ArrayList<WALEntryFilter>(filters.size());
// flatten the chains
for (WALEntryFilter filter : filters) {
if (filter instanceof ChainWALEntryFilter) {
for (WALEntryFilter f : ((ChainWALEntryFilter) filter).filters) {
rawFilters.add(f);
}
} else {
rawFilters.add(filter);
}
}
this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
}
@Override
public Entry filter(Entry entry) {
for (WALEntryFilter filter : filters) {
if (entry == null) {
return null;
}
entry = filter.filter(entry);
}
return entry;
}
}

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
*/
@InterfaceAudience.Private
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
private ZooKeeperWatcher zkw = null;
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
private volatile long lastRegionServerUpdate;
protected void disconnect() {
if (zkw != null){
zkw.close();
}
}
/**
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
*/
protected void reconnect(KeeperException ke) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
try {
reloadZkWatcher();
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
}
}
}
@Override
protected void doStart() {
try {
reloadZkWatcher();
notifyStarted();
} catch (IOException e) {
notifyFailed(e);
}
}
@Override
protected void doStop() {
disconnect();
notifyStopped();
}
@Override
public UUID getPeerUUID() {
UUID peerUUID = null;
try {
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
} catch (KeeperException ke) {
reconnect(ke);
}
return peerUUID;
}
/**
* Get the ZK connection to this peer
* @return zk connection
*/
protected ZooKeeperWatcher getZkw() {
return zkw;
}
/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
void reloadZkWatcher() throws IOException {
if (zkw != null) zkw.close();
zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
getZkw().registerListener(new PeerRegionServerListener(this));
}
@Override
public void abort(String why, Throwable e) {
LOG.fatal("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+ " was aborted for the following reason(s):" + why, e);
}
@Override
public boolean isAborted() {
// Currently this is never "Aborted", we just log when the abort method is called.
return false;
}
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
if (children == null) {
return Collections.emptyList();
}
List<ServerName> addresses = new ArrayList<ServerName>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
}
return addresses;
}
/**
* Get a list of all the addresses of all the region servers
* for this peer cluster
* @return list of addresses
* @throws KeeperException
*/
public List<ServerName> getRegionServers() {
try {
setRegionServers(fetchSlavesAddresses(this.getZkw()));
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch salves addresses failed.", ke);
}
reconnect(ke);
}
return regionServers;
}
/**
* Set the list of region servers for that peer
* @param regionServers list of addresses for the region servers
*/
public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
lastRegionServerUpdate = System.currentTimeMillis();
}
/**
* Get the timestamp at which the last change occurred to the list of region servers to replicate
* to.
* @return The System.currentTimeMillis at the last time the list of peer region servers changed.
*/
public long getLastRegionServerUpdate() {
return lastRegionServerUpdate;
}
/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
public static class PeerRegionServerListener extends ZooKeeperListener {
private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;
public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
super(replicationPeer.getZkw());
this.replicationEndpoint = replicationPeer;
this.regionServerListNode = replicationEndpoint.getZkw().rsZNode;
}
@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
try {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
} catch (KeeperException e) {
LOG.fatal("Error reading slave addresses", e);
}
}
}
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import com.google.common.util.concurrent.Service;
/**
* ReplicationEndpoint is a plugin which implements replication
* to other HBase clusters, or other systems. ReplicationEndpoint implementation
* can be specified at the peer creation time by specifying it
* in the {@link ReplicationPeerConfig}. A ReplicationEndpoint is run in a thread
* in each region server in the same process.
* <p>
* ReplicationEndpoint is closely tied to ReplicationSource in a producer-consumer
* relation. ReplicationSource is an HBase-private class which tails the logs and manages
* the queue of logs plus management and persistence of all the state for replication.
* ReplicationEndpoint on the other hand is responsible for doing the actual shipping
* and persisting of the WAL entries in the other cluster.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface ReplicationEndpoint extends Service {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
private final Configuration conf;
private final FileSystem fs;
private final ReplicationPeerConfig peerConfig;
private final ReplicationPeer replicationPeer;
private final String peerId;
private final UUID clusterId;
private final MetricsSource metrics;
@InterfaceAudience.Private
public Context(
final Configuration conf,
final FileSystem fs,
final ReplicationPeerConfig peerConfig,
final String peerId,
final UUID clusterId,
final ReplicationPeer replicationPeer,
final MetricsSource metrics) {
this.peerConfig = peerConfig;
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
this.peerId = peerId;
this.replicationPeer = replicationPeer;
this.metrics = metrics;
}
public Configuration getConfiguration() {
return conf;
}
public FileSystem getFilesystem() {
return fs;
}
public UUID getClusterId() {
return clusterId;
}
public String getPeerId() {
return peerId;
}
public ReplicationPeerConfig getPeerConfig() {
return peerConfig;
}
public ReplicationPeer getReplicationPeer() {
return replicationPeer;
}
public MetricsSource getMetrics() {
return metrics;
}
}
/**
* Initialize the replication endpoint with the given context.
* @param context replication context
* @throws IOException
*/
void init(Context context) throws IOException;
/** Whether or not, the replication endpoint can replicate to it's source cluster with the same
* UUID */
boolean canReplicateToSameCluster();
/**
* Returns a UUID of the provided peer id. Every HBase cluster instance has a persisted
* associated UUID. If the replication is not performed to an actual HBase cluster (but
* some other system), the UUID returned has to uniquely identify the connected target system.
* @return a UUID or null if the peer cluster does not exist or is not connected.
*/
UUID getPeerUUID();
/**
* Returns a WALEntryFilter to use for filtering out WALEntries from the log. Replication
* infrastructure will call this filter before sending the edits to shipEdits().
* @return a {@link WALEntryFilter} or null.
*/
WALEntryFilter getWALEntryfilter();
/**
* A context for {@link ReplicationEndpoint#replicate(ReplicateContext)} method.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class ReplicateContext {
List<HLog.Entry> entries;
int size;
@InterfaceAudience.Private
public ReplicateContext() {
}
public ReplicateContext setEntries(List<HLog.Entry> entries) {
this.entries = entries;
return this;
}
public ReplicateContext setSize(int size) {
this.size = size;
return this;
}
public List<HLog.Entry> getEntries() {
return entries;
}
public int getSize() {
return size;
}
}
/**
* Replicate the given set of entries (in the context) to the other cluster.
* Can block until all the given entries are replicated. Upon this method is returned,
* all entries that were passed in the context are assumed to be persisted in the
* target cluster.
* @param replicateContext a context where WAL entries and other
* parameters can be obtained.
*/
boolean replicate(ReplicateContext replicateContext);
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.NavigableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
/**
* Keeps KVs that are scoped other than local
*/
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
if (scopes == null || scopes.isEmpty()) {
return null;
}
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
int size = kvs.size();
for (int i = size - 1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
if (!scopes.containsKey(kv.getFamily())
|| scopes.get(kv.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
kvs.remove(i);
}
}
if (kvs.size() < size / 2) {
kvs.trimToSize();
}
return entry;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
/**
* Skips WAL edits for all System tables including META
*/
@InterfaceAudience.Private
public class SystemTableWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
if (entry.getKey().getTablename().isSystemTable()) {
return null;
}
return entry;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
public class TableCfWALEntryFilter implements WALEntryFilter {
private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
private final ReplicationPeer peer;
public TableCfWALEntryFilter(ReplicationPeer peer) {
this.peer = peer;
}
@Override
public Entry filter(Entry entry) {
String tabName = entry.getKey().getTablename().getNameAsString();
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
Map<String, List<String>> tableCFs = null;
try {
tableCFs = this.peer.getTableCFs();
} catch (IllegalArgumentException e) {
LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
", degenerate as if it's not configured by keeping tableCFs==null");
}
int size = kvs.size();
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicable table list (empty tableCFs means all table are replicable)
if (tableCFs != null && !tableCFs.containsKey(tabName)) {
return null;
} else {
List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
for (int i = size - 1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
kvs.remove(i);
}
}
}
if (kvs.size() < size/2) {
kvs.trimToSize();
}
return entry;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* A Filter for WAL entries before being sent over to replication. Multiple
* filters might be chained together using {@link ChainWALEntryFilter}.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface WALEntryFilter {
/**
* Applies the filter, possibly returning a different HLog.Entry instance.
* If null is returned, the entry will be skipped.
* @param entry WAL Entry to filter
* @return a (possibly modified) HLog.Entry to use. Returning null or an entry with
* no cells will cause the entry to be skipped for replication.
*/
public HLog.Entry filter(HLog.Entry entry);
}

View File

@ -25,14 +25,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.List;
import java.util.Set;
@ -57,12 +54,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}
final Set<String> hlogs = loadHLogsFromQueues();
return Iterables.filter(files, new Predicate<FileStatus>() {
@Override
@ -137,8 +134,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
// Not sure why we're deleting a connection that we never acquired or used
HConnectionManager.deleteConnection(this.getConf());
}
@Override

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.ipc.RemoteException;
/**
* A {@link ReplicationEndpoint} implementation for replicating to another HBase cluster.
* For the slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
* <p/>
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
*/
@InterfaceAudience.Private
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
private HConnection conn;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
//Metrics for this source
private MetricsSource metrics;
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
decorateConf();
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier * maxRetriesMultiplier);
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = HConnectionManager.createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
}
private void decorateConf() {
String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
}
private void connectToPeers() {
getRegionServers();
int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop
while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
replicationSinkMgr.chooseSinks();
if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* Do the shipping logic
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
List<HLog.Entry> entries = replicateContext.getEntries();
int sleepMultiplier = 1;
while (this.isRunning()) {
if (!peersSelected) {
connectToPeers();
peersSelected = true;
}
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
SinkPeer sinkPeer = null;
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
ReplicationProtbufUtil.replicateWALEntry(rrs,
entries.toArray(new HLog.Entry[entries.size()]));
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
return true;
} catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did
this.metrics.refreshAgeOfLastShippedOp();
if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
if (ioe instanceof TableNotFoundException) {
if (sleepForRetries("A table is missing in the peer cluster. "
+ "Replication cannot proceed without losing data.", sleepMultiplier)) {
sleepMultiplier++;
}
}
} else {
if (ioe instanceof SocketTimeoutException) {
// This exception means we waited for more than 60s and nothing
// happened, the cluster is alive and calling it right away
// even for a test just makes things worse.
sleepForRetries("Encountered a SocketTimeoutException. Since the " +
"call to the remote cluster timed out, which is usually " +
"caused by a machine failure or a massive slowdown",
this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException) {
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
replicationSinkMgr.chooseSinks();
} else {
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
}
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
return false; // in case we exited before replicating
}
protected boolean isPeerEnabled() {
return ctx.getReplicationPeer().getPeerState() == PeerState.ENABLED;
}
@Override
protected void doStop() {
disconnect(); //don't call super.doStop()
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("Failed to close the connection");
}
}
notifyStopped();
}
}

View File

@ -22,13 +22,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class is for maintaining the various replication statistics for a source and publishing them
* through the metrics interfaces.
*/
@InterfaceAudience.Private
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class MetricsSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
@ -152,7 +153,7 @@ public class MetricsSource {
rms.incCounters(shippedKBsKey, sizeInKB);
rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
}
/** increase the byte number read by source from log file */
public void incrLogReadInBytes(long readInBytes) {
rms.incCounters(logReadInBytesKey, readInBytes);

View File

@ -29,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -61,7 +60,7 @@ public class ReplicationSinkManager {
private final String peerClusterId;
private final ReplicationPeers replicationPeers;
private final HBaseReplicationEndpoint endpoint;
// Count of "bad replication sink" reports per peer sink
private final Map<ServerName, Integer> badReportCounts;
@ -85,15 +84,15 @@ public class ReplicationSinkManager {
* Instantiate for a single replication peer cluster.
* @param conn connection to the peer cluster
* @param peerClusterId identifier of the peer cluster
* @param replicationPeers manages peer clusters being replicated to
* @param endpoint replication endpoint for inter cluster replication
* @param conf HBase configuration, used for determining replication source ratio and bad peer
* threshold
*/
public ReplicationSinkManager(HConnection conn, String peerClusterId,
ReplicationPeers replicationPeers, Configuration conf) {
HBaseReplicationEndpoint endpoint, Configuration conf) {
this.conn = conn;
this.peerClusterId = peerClusterId;
this.replicationPeers = replicationPeers;
this.endpoint = endpoint;
this.badReportCounts = Maps.newHashMap();
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
@ -107,8 +106,7 @@ public class ReplicationSinkManager {
* @return a replication sink to replicate to
*/
public SinkPeer getReplicationSink() throws IOException {
if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
> this.lastUpdateToPeers) {
if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers) {
LOG.info("Current list of sinks is out of date, updating");
chooseSinks();
}
@ -143,8 +141,7 @@ public class ReplicationSinkManager {
}
void chooseSinks() {
List<ServerName> slaveAddresses =
replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
List<ServerName> slaveAddresses = endpoint.getRegionServers();
Collections.shuffle(slaveAddresses, random);
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
sinks = slaveAddresses.subList(0, numSinks);

View File

@ -21,13 +21,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -41,27 +37,25 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
/**
* Class that handles the source of a replication stream.
@ -82,9 +76,9 @@ public class ReplicationSource extends Thread
public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
private HConnection conn;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
@ -118,8 +112,6 @@ public class ReplicationSource extends Thread
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
@ -130,10 +122,14 @@ public class ReplicationSource extends Thread
private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader;
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
private WALEntryFilter walEntryFilter;
// Context for ReplicationEndpoint#replicate()
private ReplicationEndpoint.ReplicateContext replicateContext;
// throttler
private ReplicationThrottler throttler;
@ -145,30 +141,30 @@ public class ReplicationSource extends Thread
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
* @throws IOException
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId) throws IOException {
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics)
throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
this.conf = conf;
decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier * maxRetriesMultiplier);
this.queue =
new PriorityBlockingQueue<Path>(
this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = HConnectionManager.getConnection(this.conf);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
@ -177,7 +173,7 @@ public class ReplicationSource extends Thread
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.metrics = new MetricsSource(peerClusterZnode);
this.metrics = metrics;
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
this.clusterId = clusterId;
@ -185,8 +181,10 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
this.replicateContext = new ReplicationEndpoint.ReplicateContext();
}
private void decorateConf() {
@ -209,30 +207,48 @@ public class ReplicationSource extends Thread
}
private void uninitialize() {
if (this.conn != null) {
try {
this.conn.close();
} catch (IOException e) {
LOG.debug("Attempt to close connection failed", e);
}
}
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING
|| replicationEndpoint.state() == Service.State.RUNNING) {
replicationEndpoint.stopAndWait();
}
}
@Override
public void run() {
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
uninitialize();
return;
}
try {
// start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get();
if (state != Service.State.RUNNING) {
LOG.warn("ReplicationEndpoint was not started. Exiting");
uninitialize();
return;
}
} catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
throw new RuntimeException(ex);
}
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
this.walEntryFilter = new ChainWALEntryFilter(filters);
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isActive() && this.peerClusterId == null) {
this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
@ -250,9 +266,10 @@ public class ReplicationSource extends Thread
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId)) {
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId);
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
}
LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
@ -397,8 +414,8 @@ public class ReplicationSource extends Thread
* entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<HLog.Entry> entries)
throws IOException{
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
List<HLog.Entry> entries) throws IOException{
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
@ -409,18 +426,22 @@ public class ReplicationSource extends Thread
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition();
while (entry != null) {
WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead();
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
// don't replicate if the log entries have already been consumed by the cluster
if (!logKey.getClusterIds().contains(peerClusterId)) {
removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
edit.size() != 0) {
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
// Remove all KVs that should not be replicated
entry = walEntryFilter.filter(entry);
WALEdit edit = null;
HLogKey logKey = null;
if (entry != null) {
edit = entry.getEdit();
logKey = entry.getKey();
}
if (edit != null && edit.size() != 0) {
//Mark that the current cluster has the change
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
@ -451,20 +472,6 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
}
private void connectToPeers() {
int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
replicationSinkMgr.chooseSinks();
if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
/**
* Poll for the next path
* @return true if a path was obtained, false if not
@ -594,8 +601,8 @@ public class ReplicationSource extends Thread
/*
* Checks whether the current log file is empty, and it is not a recovered queue. This is to
* handle scenario when in an idle cluster, there is no entry in the current log and we keep on
* trying to read the log file and get EOFEception. In case of a recovered queue the last log file
* may be empty, and we don't want to retry that.
* trying to read the log file and get EOFException. In case of a recovered queue the last log
* file may be empty, and we don't want to retry that.
*/
private boolean isCurrentLogEmpty() {
return (this.repLogReader.getPosition() == 0 &&
@ -621,47 +628,6 @@ public class ReplicationSource extends Thread
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* We only want KVs that are scoped other than local
* @param entry The entry to check for replication
*/
protected void removeNonReplicableEdits(HLog.Entry entry) {
String tabName = entry.getKey().getTablename().getNameAsString();
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
Map<String, List<String>> tableCFs = null;
try {
tableCFs = this.replicationPeers.getTableCFs(peerId);
} catch (IllegalArgumentException e) {
LOG.error("should not happen: can't get tableCFs for peer " + peerId +
", degenerate as if it's not configured by keeping tableCFs==null");
}
int size = kvs.size();
// clear kvs(prevent replicating) if logKey's table isn't in this peer's
// replicable table list (empty tableCFs means all table are replicable)
if (tableCFs != null && !tableCFs.containsKey(tabName)) {
kvs.clear();
} else {
NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
for (int i = size - 1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if (scopes == null || !scopes.containsKey(kv.getFamily()) ||
(cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
kvs.remove(i);
}
}
}
if (kvs.size() < size/2) {
kvs.trimToSize();
}
}
/**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one KV in the WALEdit.
@ -692,13 +658,6 @@ public class ReplicationSource extends Thread
return;
}
while (this.isActive()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
SinkPeer sinkPeer = null;
try {
if (this.throttler.isEnabled()) {
long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
@ -719,14 +678,15 @@ public class ReplicationSource extends Thread
this.throttler.resetStartTick();
}
}
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + entries.size() +
" entries of total size " + currentSize);
replicateContext.setEntries(entries).setSize(currentSize);
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext);
if (!replicated) {
continue;
}
ReplicationProtbufUtil.replicateWALEntry(rrs,
entries.toArray(new HLog.Entry[entries.size()]));
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.repLogReader.getPosition(),
@ -745,50 +705,9 @@ public class ReplicationSource extends Thread
+ this.totalReplicatedOperations + " operations");
}
break;
} catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did
this.metrics.refreshAgeOfLastShippedOp();
if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
if (ioe instanceof TableNotFoundException) {
if (sleepForRetries("A table is missing in the peer cluster. "
+ "Replication cannot proceed without losing data.", sleepMultiplier)) {
sleepMultiplier++;
}
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
if (isInterrupted()) {
continue;
}
}
} else {
if (ioe instanceof SocketTimeoutException) {
// This exception means we waited for more than 60s and nothing
// happened, the cluster is alive and calling it right away
// even for a test just makes things worse.
sleepForRetries("Encountered a SocketTimeoutException. Since the " +
"call to the remote cluster timed out, which is usually " +
"caused by a machine failure or a massive slowdown",
this.socketTimeoutMultiplier);
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
if (isInterrupted()) {
continue;
}
} else if (ioe instanceof ConnectException) {
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
replicationSinkMgr.chooseSinks();
} else {
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
}
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
} catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" + ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
@ -801,7 +720,7 @@ public class ReplicationSource extends Thread
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfConnectedPeer(this.peerId);
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
/**
@ -835,10 +754,12 @@ public class ReplicationSource extends Thread
return false;
}
@Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource," +
" currentPath=" + currentPath, e);
@ -849,11 +770,17 @@ public class ReplicationSource extends Thread
this.peerClusterZnode, handler);
}
@Override
public void terminate(String reason) {
terminate(reason, null);
}
@Override
public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
@ -864,17 +791,33 @@ public class ReplicationSource extends Thread
}
this.running = false;
this.interrupt();
Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
Threads.shutdown(this, this.sleepForRetries);
if (future != null) {
try {
future.get();
} catch (Exception e) {
LOG.warn("Got exception:" + e);
}
}
}
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
@Override
public String getPeerClusterId() {
return this.peerId;
}
@Override
public Path getCurrentPath() {
return this.currentPath;
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@ -50,7 +51,8 @@ public interface ReplicationSourceInterface {
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId) throws IOException;
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics) throws IOException;
/**
* Add a log to the list of logs to replicate

View File

@ -43,9 +43,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
@ -115,7 +119,7 @@ public class ReplicationSourceManager implements ReplicationListener {
final Configuration conf, final Stoppable stopper, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
@ -194,7 +198,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server hlog queues
*/
protected void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeers()) {
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@ -221,9 +225,12 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
ReplicationPeerConfig peerConfig
= replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, stopper, id, this.clusterId);
this.replicationPeers, stopper, id, this.clusterId, peerConfig, peer);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>());
@ -254,7 +261,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
if (closeConnection) {
this.replicationPeers.disconnectFromPeer(peerId);
this.replicationPeers.peerRemoved(peerId);
}
}
@ -340,7 +347,9 @@ public class ReplicationSourceManager implements ReplicationListener {
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
final Stoppable stopper, final String peerId, final UUID clusterId,
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
throws IOException {
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
@ -351,9 +360,32 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
}
src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
ReplicationEndpoint replicationEndpoint = null;
try {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
@SuppressWarnings("rawtypes")
Class c = Class.forName(replicationEndpointImpl);
replicationEndpoint = (ReplicationEndpoint) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors", e);
throw new IOException(e);
}
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId,
clusterId, replicationEndpoint, metrics);
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
fs, peerConfig, peerId, clusterId, replicationPeer, metrics));
return src;
}
@ -441,7 +473,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void peerListChanged(List<String> peerIds) {
for (String id : peerIds) {
try {
boolean added = this.replicationPeers.connectToPeer(id);
boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
}
@ -507,10 +539,26 @@ public class ReplicationSourceManager implements ReplicationListener {
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
} catch (ReplicationException ex) {
LOG.warn("Received exception while getting replication peer config, skipping replay"
+ ex);
}
if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
continue;
}
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
stopper, peerId, this.clusterId);
if (!this.rp.getConnectedPeers().contains((src.getPeerClusterId()))) {
stopper, peerId, this.clusterId, peerConfig, peer);
if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
@ -561,7 +609,7 @@ public class ReplicationSourceManager implements ReplicationListener {
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client.replication;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -25,6 +29,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -117,5 +123,36 @@ public class TestReplicationAdmin {
admin.removePeer(ID_ONE);
}
@Test
public void testGetTableCfsStr() {
// opposite of TestPerTableCFReplication#testParseTableCFsFromConfig()
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
assertEquals(null, ReplicationAdmin.getTableCfsStr(tabCFsMap));
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
tabCFsMap = new TreeMap<TableName, List<String>>();
tabCFsMap.put(TableName.valueOf("tab1"), null); // its table name is "tab1"
assertEquals("tab1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
tabCFsMap = new TreeMap<TableName, List<String>>();
tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1"));
assertEquals("tab1:cf1", ReplicationAdmin.getTableCfsStr(tabCFsMap));
tabCFsMap = new TreeMap<TableName, List<String>>();
tabCFsMap.put(TableName.valueOf("tab1"), Lists.newArrayList("cf1", "cf3"));
assertEquals("tab1:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
// 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
tabCFsMap = new TreeMap<TableName, List<String>>();
tabCFsMap.put(TableName.valueOf("tab1"), null);
tabCFsMap.put(TableName.valueOf("tab2"), Lists.newArrayList("cf1"));
tabCFsMap.put(TableName.valueOf("tab3"), Lists.newArrayList("cf1", "cf3"));
assertEquals("tab1;tab2:cf1;tab3:cf1,cf3", ReplicationAdmin.getTableCfsStr(tabCFsMap));
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@ -40,7 +41,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
UUID clusterId) throws IOException {
UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics)
throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;

View File

@ -175,30 +175,30 @@ public class TestPerTableCFReplication {
Map<String, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(null);
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(null);
assertEquals(null, tabCFsMap);
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("");
assertEquals(null, tabCFsMap);
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig(" ");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(" ");
assertEquals(null, tabCFsMap);
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab1")); // its table name is "tab1"
assertFalse(tabCFsMap.containsKey("tab2")); // not other table
assertEquals(null, tabCFsMap.get("tab1")); // null cf-list,
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab2:cf1");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab2:cf1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab2")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
assertEquals(1, tabCFsMap.get("tab2").size()); // cf-list contains only 1 cf
assertEquals("cf1", tabCFsMap.get("tab2").get(0));// the only cf is "cf1"
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab3 : cf1 , cf3");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab3 : cf1 , cf3");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey("tab3")); // its table name is "tab2"
assertFalse(tabCFsMap.containsKey("tab1")); // not other table
@ -207,7 +207,7 @@ public class TestPerTableCFReplication {
assertTrue(tabCFsMap.get("tab3").contains("cf3"));// contains "cf3"
// 3. multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig("tab1 ; tab2:cf1 ; tab3:cf1,cf3");
// 3.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@ -225,7 +225,8 @@ public class TestPerTableCFReplication {
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
// still use the example of multiple tables: "tab1 ; tab2:cf1 ; tab3:cf1,cf3"
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
"tab1 ; ; tab2:cf1 ; tab3:cf1,,cf3 ;");
// 4.1 contains 3 tables : "tab1", "tab2" and "tab3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey("tab1"));
@ -243,7 +244,8 @@ public class TestPerTableCFReplication {
// 5. invalid format "tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3"
// "tab1:tt:cf1" and "tab2::cf1" are invalid and will be ignored totally
tabCFsMap = ReplicationPeer.parseTableCFsFromConfig("tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
tabCFsMap = ReplicationPeerZKImpl.parseTableCFsFromConfig(
"tab1:tt:cf1 ; tab2::cf1 ; tab3:cf1,cf3");
// 5.1 no "tab1" and "tab2", only "tab3"
assertEquals(1, tabCFsMap.size()); // only one table
assertFalse(tabCFsMap.containsKey("tab1"));

View File

@ -0,0 +1,272 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests ReplicationSource and ReplicationEndpoint interactions
*/
@Category(MediumTests.class)
public class TestReplicationEndpoint extends TestReplicationBase {
static int numRegionServers;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
utility2.shutdownMiniCluster(); // we don't need the second cluster
admin.removePeer("2");
numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
// check stop is called
Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
}
@Before
public void setup() throws FailedLogCloseException, IOException {
ReplicationEndpointForTest.contructedCount.set(0);
ReplicationEndpointForTest.startedCount.set(0);
ReplicationEndpointForTest.replicateCount.set(0);
ReplicationEndpointForTest.lastEntries = null;
for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
utility1.getHBaseAdmin().rollHLogWriter(rs.getRegionServer().getServerName().toString());
}
}
@Test
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
}
});
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
}
});
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
// now replicate some data.
doPut(Bytes.toBytes("row42"));
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
doAssert(Bytes.toBytes("row42"));
admin.removePeer("testCustomReplicationEndpoint");
}
@Test
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// now replicate some data.
doPut(row);
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointReturningFalse.replicated.get();
}
});
if (ReplicationEndpointReturningFalse.ex.get() != null) {
throw ReplicationEndpointReturningFalse.ex.get();
}
admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
}
@Test
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
// now replicate some data.
doPut(Bytes.toBytes("row1"));
doPut(row);
doPut(Bytes.toBytes("row2"));
Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
}
private void doPut(byte[] row) throws IOException {
Put put = new Put(row);
put.add(famName, row, row);
htable1 = new HTable(conf1, tableName);
htable1.put(put);
htable1.close();
}
private static void doAssert(byte[] row) throws Exception {
if (ReplicationEndpointForTest.lastEntries == null) {
return; // first call
}
Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
Assert.assertEquals(1, kvs.size());
Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
kvs.get(0).getRowLength(), row, 0, row.length));
}
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
static UUID uuid = UUID.randomUUID();
static AtomicInteger contructedCount = new AtomicInteger();
static AtomicInteger startedCount = new AtomicInteger();
static AtomicInteger stoppedCount = new AtomicInteger();
static AtomicInteger replicateCount = new AtomicInteger();
static volatile List<HLog.Entry> lastEntries = null;
public ReplicationEndpointForTest() {
contructedCount.incrementAndGet();
}
@Override
public UUID getPeerUUID() {
return uuid;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
replicateCount.incrementAndGet();
lastEntries = replicateContext.entries;
return true;
}
@Override
protected void doStart() {
startedCount.incrementAndGet();
notifyStarted();
}
@Override
protected void doStop() {
stoppedCount.incrementAndGet();
notifyStopped();
}
}
public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
static AtomicBoolean replicated = new AtomicBoolean(false);
@Override
public boolean replicate(ReplicateContext replicateContext) {
try {
// check row
doAssert(row);
} catch (Exception e) {
ex.set(e);
}
super.replicate(replicateContext);
replicated.set(replicateCount.get() > 10); // first 10 times, we return false
return replicated.get();
}
}
// return a WALEntry filter which only accepts "row", but not other rows
public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
@Override
public boolean replicate(ReplicateContext replicateContext) {
try {
super.replicate(replicateContext);
doAssert(row);
} catch (Exception e) {
ex.set(e);
}
return true;
}
@Override
public WALEntryFilter getWALEntryfilter() {
return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
int size = kvs.size();
for (int i = size-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
row, 0, row.length)) {
kvs.remove(i);
}
}
return entry;
}
});
}
}
}

View File

@ -179,50 +179,48 @@ public abstract class TestReplicationStateBasic {
} catch (IllegalArgumentException e) {
}
try {
rp.getStatusOfConnectedPeer("bogus");
rp.getStatusOfPeer("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
assertFalse(rp.connectToPeer("bogus"));
rp.disconnectFromPeer("bogus");
assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
assertNull(rp.getPeerUUID("bogus"));
assertFalse(rp.peerAdded("bogus"));
rp.peerRemoved("bogus");
assertNull(rp.getPeerConf("bogus"));
assertNumberOfPeers(0, 0);
assertNumberOfPeers(0);
// Add some peers
rp.addPeer(ID_ONE, KEY_ONE);
assertNumberOfPeers(0, 1);
rp.addPeer(ID_TWO, KEY_TWO);
assertNumberOfPeers(0, 2);
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
assertNumberOfPeers(1);
rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
try {
rp.getStatusOfConnectedPeer(ID_ONE);
rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
assertNull(rp.getPeerUUID(ID_ONE));
assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
rp.disconnectFromPeer(ID_ONE);
assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
rp.removePeer(ID_ONE);
rp.peerRemoved(ID_ONE);
assertNumberOfPeers(1);
// Connect to one peer
rp.connectToPeer(ID_ONE);
assertNumberOfPeers(1, 2);
assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
// Add one peer
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
rp.peerAdded(ID_ONE);
assertNumberOfPeers(2);
assertTrue(rp.getStatusOfPeer(ID_ONE));
rp.disablePeer(ID_ONE);
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
assertConnectedPeerStatus(true, ID_ONE);
assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
assertNotNull(rp.getPeerUUID(ID_ONE).toString());
// Disconnect peer
rp.disconnectFromPeer(ID_ONE);
assertNumberOfPeers(0, 2);
rp.peerRemoved(ID_ONE);
assertNumberOfPeers(2);
try {
rp.getStatusOfConnectedPeer(ID_ONE);
rp.getStatusOfPeer(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
@ -234,7 +232,7 @@ public abstract class TestReplicationStateBasic {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
}
while (true) {
if (status == rp.getStatusOfConnectedPeer(peerId)) {
if (status == rp.getStatusOfPeer(peerId)) {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
@ -247,9 +245,9 @@ public abstract class TestReplicationStateBasic {
}
}
protected void assertNumberOfPeers(int connected, int total) {
assertEquals(total, rp.getAllPeerClusterKeys().size());
assertEquals(connected, rp.getConnectedPeers().size());
protected void assertNumberOfPeers(int total) {
assertEquals(total, rp.getAllPeerConfigs().size());
assertEquals(total, rp.getAllPeerIds().size());
assertEquals(total, rp.getAllPeerIds().size());
}
@ -269,7 +267,7 @@ public abstract class TestReplicationStateBasic {
rq3.addLog("qId" + i, "filename" + j);
}
//Add peers for the corresponding queues so they are not orphans
rp.addPeer("qId" + i, "bogus" + i);
rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
}
}
}

View File

@ -145,7 +145,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerRemovedEvent() throws Exception {
rp.addPeer("5", utility.getClusterKey());
rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
rt.registerListener(new DummyReplicationListener());
rp.removePeer("5");
// wait for event
@ -158,7 +158,7 @@ public class TestReplicationTrackerZKImpl {
@Ignore ("Flakey") @Test(timeout = 30000)
public void testPeerListChangedEvent() throws Exception {
// add a peer
rp.addPeer("5", utility.getClusterKey());
rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
rt.registerListener(new DummyReplicationListener());
rp.disablePeer("5");

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@Category(SmallTests.class)
public class TestReplicationWALEntryFilters {
static byte[] a = new byte[] {'a'};
static byte[] b = new byte[] {'b'};
static byte[] c = new byte[] {'c'};
static byte[] d = new byte[] {'d'};
@Test
public void testSystemTableWALEntryFilter() {
SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
// meta
HLogKey key1 = new HLogKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
HTableDescriptor.META_TABLEDESC.getTableName());
HLog.Entry metaEntry = new Entry(key1, null);
assertNull(filter.filter(metaEntry));
// ns table
HLogKey key2 = new HLogKey(new byte[] {}, HTableDescriptor.NAMESPACE_TABLEDESC.getTableName());
HLog.Entry nsEntry = new Entry(key2, null);
assertNull(filter.filter(nsEntry));
// user table
HLogKey key3 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
HLog.Entry userEntry = new Entry(key3, null);
assertEquals(userEntry, filter.filter(userEntry));
}
@Test
public void testScopeWALEntryFilter() {
ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
HLog.Entry userEntry = createEntry(a, b);
HLog.Entry userEntryA = createEntry(a);
HLog.Entry userEntryB = createEntry(b);
HLog.Entry userEntryEmpty = createEntry();
// no scopes
assertEquals(null, filter.filter(userEntry));
// empty scopes
TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
assertEquals(null, filter.filter(userEntry));
// different scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
// all kvs should be filtered
assertEquals(userEntryEmpty, filter.filter(userEntry));
// local scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
assertEquals(userEntryEmpty, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryEmpty, filter.filter(userEntry));
// only scope a
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
assertEquals(userEntryA, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryA, filter.filter(userEntry));
// only scope b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
// scope a and b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
}
WALEntryFilter nullFilter = new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
return null;
}
};
WALEntryFilter passFilter = new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
return entry;
}
};
@Test
public void testChainWALEntryFilter() {
HLog.Entry userEntry = createEntry(a, b, c);
ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter);
assertEquals(null, filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter, passFilter);
assertEquals(null, filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, nullFilter);
assertEquals(null, filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter, passFilter, nullFilter);
assertEquals(null, filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter, nullFilter);
assertEquals(null, filter.filter(userEntry));
// flatten
filter =
new ChainWALEntryFilter(
new ChainWALEntryFilter(passFilter,
new ChainWALEntryFilter(passFilter, passFilter),
new ChainWALEntryFilter(passFilter),
new ChainWALEntryFilter(passFilter)),
new ChainWALEntryFilter(passFilter));
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
filter =
new ChainWALEntryFilter(
new ChainWALEntryFilter(passFilter,
new ChainWALEntryFilter(passFilter,
new ChainWALEntryFilter(nullFilter))),
new ChainWALEntryFilter(passFilter));
assertEquals(null, filter.filter(userEntry));
}
@Test
public void testTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getTableCFs()).thenReturn(null);
HLog.Entry userEntry = createEntry(a, b, c);
TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
// empty map
userEntry = createEntry(a, b, c);
Map<String, List<String>> tableCfs = new HashMap<String, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(null, filter.filter(userEntry));
// table bar
userEntry = createEntry(a, b, c);
tableCfs = new HashMap<String, List<String>>();
tableCfs.put("bar", null);
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(null, filter.filter(userEntry));
// table foo:a
userEntry = createEntry(a, b, c);
tableCfs = new HashMap<String, List<String>>();
tableCfs.put("foo", Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a), filter.filter(userEntry));
// table foo:a,c
userEntry = createEntry(a, b, c, d);
tableCfs = new HashMap<String, List<String>>();
tableCfs.put("foo", Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a,c), filter.filter(userEntry));
}
private HLog.Entry createEntry(byte[]... kvs) {
HLogKey key1 = new HLogKey(new byte[] {}, TableName.valueOf("foo"));
WALEdit edit1 = new WALEdit();
for (byte[] kv : kvs) {
edit1.add(new KeyValue(kv, kv, kv));
}
return new HLog.Entry(key1, edit1);
}
private void assertEquals(HLog.Entry e1, HLog.Entry e2) {
Assert.assertEquals(e1 == null, e2 == null);
if (e1 == null) {
return;
}
// do not compare HLogKeys
// compare kvs
Assert.assertEquals(e1.getEdit() == null, e2.getEdit() == null);
if (e1.getEdit() == null) {
return;
}
List<KeyValue> kvs1 = e1.getEdit().getKeyValues();
List<KeyValue> kvs2 = e2.getEdit().getKeyValues();
Assert.assertEquals(kvs1.size(), kvs2.size());
for (int i = 0; i < kvs1.size(); i++) {
KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i));
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.junit.Before;
@ -42,13 +43,15 @@ public class TestReplicationSinkManager {
private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
private ReplicationPeers replicationPeers;
private HBaseReplicationEndpoint replicationEndpoint;
private ReplicationSinkManager sinkManager;
@Before
public void setUp() {
replicationPeers = mock(ReplicationPeers.class);
replicationEndpoint = mock(HBaseReplicationEndpoint.class);
sinkManager = new ReplicationSinkManager(mock(HConnection.class),
PEER_CLUSTER_ID, replicationPeers, new Configuration());
PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
}
@Test
@ -58,7 +61,7 @@ public class TestReplicationSinkManager {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@ -72,7 +75,7 @@ public class TestReplicationSinkManager {
List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
mock(ServerName.class));
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
sinkManager.chooseSinks();
@ -84,8 +87,8 @@ public class TestReplicationSinkManager {
public void testReportBadSink() {
ServerName serverNameA = mock(ServerName.class);
ServerName serverNameB = mock(ServerName.class);
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
Lists.newArrayList(serverNameA, serverNameB));
when(replicationEndpoint.getRegionServers())
.thenReturn(Lists.newArrayList(serverNameA, serverNameB));
sinkManager.chooseSinks();
// Sanity check
@ -110,7 +113,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);
@ -137,7 +140,7 @@ public class TestReplicationSinkManager {
for (int i = 0; i < 20; i++) {
serverNames.add(mock(ServerName.class));
}
when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
when(replicationEndpoint.getRegionServers())
.thenReturn(serverNames);