HBASE-2201 JRuby shell for replication

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1024470 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-10-20 00:21:08 +00:00
parent 368e9c4d19
commit a7d61ec5b5
26 changed files with 1284 additions and 240 deletions

View File

@ -1072,6 +1072,7 @@ Release 0.21.0 - Unreleased
HBASE-3073 New APIs for Result, faster implementation for some calls
HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for
test writing
HBASE-2201 JRuby shell for replication
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -1,75 +0,0 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script to add a peer to a cluster
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "add_peer"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
c2 = HBaseConfiguration.create()
parts2 = ARGV[1].split(":")
c1 = HBaseConfiguration.create()
c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
zkw1 = ZooKeeperWrapper.createInstance(c1, "ZK1")
zkw1.writeZNode(parts1[2], "replication", "a")
zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
zkw2 = ZooKeeperWrapper.createInstance(c2, "ZK2")
zkw2.writeZNode(parts2[2], "replication", "a")
zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
puts "Peer successfully added"

View File

@ -0,0 +1,165 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.replication;
import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* <p>
* This class provides the administrative interface to HBase cluster
* replication. In order to use it, the cluster and the client using
* ReplicationAdmin must be configured with <code>hbase.replication</code>
* set to true.
* </p>
* <p>
* Adding a new peer results in creating new outbound connections from every
* region server to a subset of region servers on the slave cluster. Each
* new stream of replication will start replicating from the beginning of the
* current HLog, meaning that edits from that past will be replicated.
* </p>
* <p>
* Removing a peer is a destructive and irreversible operation that stops
* all the replication streams for the given cluster and deletes the metadata
* used to keep track of the replication state.
* </p>
* <p>
* Enabling and disabling peers is currently not supported.
* </p>
* <p>
* As cluster replication is still experimental, a kill switch is provided
* in order to stop all replication-related operations, see
* {@link #setReplicating(boolean)}. When setting it back to true, the new
* state of all the replication streams will be unknown and may have holes.
* Use at your own risk.
* </p>
* <p>
* To see which commands are available in the shell, type
* <code>replication</code>.
* </p>
*/
public class ReplicationAdmin {
private final ReplicationZookeeper replicationZk;
/**
* Constructor that creates a connection to the local ZooKeeper ensemble.
* @param conf Configuration to use
* @throws IOException if the connection to ZK cannot be made
* @throws RuntimeException if replication isn't enabled.
*/
public ReplicationAdmin(Configuration conf) throws IOException {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new RuntimeException("hbase.replication isn't true, please " +
"enable it in order to use replication");
}
ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
getZooKeeperWatcher();
try {
this.replicationZk = new ReplicationZookeeper(conf, zkw);
} catch (KeeperException e) {
throw new IOException("Unable setup the ZooKeeper connection", e);
}
}
/**
* Add a new peer cluster to replicate to.
* @param id a short that identifies the cluster
* @param clusterKey the concatenation of the slave cluster's
* <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
* @throws IllegalStateException if there's already one slave since
* multi-slave isn't supported yet.
*/
public void addPeer(String id, String clusterKey) throws IOException {
this.replicationZk.addPeer(id, clusterKey);
}
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short that identifies the cluster
*/
public void removePeer(String id) throws IOException {
this.replicationZk.removePeer(id);
}
/**
* Restart the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
public void enablePeer(String id) {
throw new NotImplementedException("Not implemented");
}
/**
* Stop the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
public void disablePeer(String id) {
throw new NotImplementedException("Not implemented");
}
/**
* Get the number of slave clusters the local cluster has.
* @return number of slave clusters
*/
public int getPeersCount() {
return this.replicationZk.listPeersIdsAndWatch().size();
}
/**
* Get the current status of the kill switch, if the cluster is replicating
* or not.
* @return true if the cluster is replicated, otherwise false
*/
public boolean getReplicating() throws IOException {
try {
return this.replicationZk.getReplication();
} catch (KeeperException e) {
throw new IOException("Couldn't get the replication status");
}
}
/**
* Kill switch for all replication-related features
* @param newState true to start replication, false to stop it.
* completely
* @return the previous state
*/
public boolean setReplicating(boolean newState) throws IOException {
boolean prev = getReplicating();
this.replicationZk.setReplicating(newState);
return prev;
}
/**
* Get the ZK-support tool created and used by this object for replication.
* @return the ZK-support tool
*/
ReplicationZookeeper getReplicationZk() {
return replicationZk;
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* This class acts as a wrapper for all the objects used to identify and
* communicate with remote peers. Everything needs to be created for objects
* of this class as it doesn't encapsulate any specific functionality e.g.
* it's a container class.
*/
public class ReplicationPeer {
private final String clusterKey;
private final String id;
private List<HServerAddress> regionServers =
new ArrayList<HServerAddress>(0);
private final AtomicBoolean peerEnabled = new AtomicBoolean();
// Cannot be final since a new object needs to be recreated when session fails
private ZooKeeperWatcher zkw;
private final Configuration conf;
/**
* Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses.
* @param conf configuration object to this peer
* @param key cluster key used to locate the peer
* @param id string representation of this peer's identifier
* @param zkw zookeeper connection to the peer
*/
public ReplicationPeer(Configuration conf, String key,
String id, ZooKeeperWatcher zkw) {
this.conf = conf;
this.clusterKey = key;
this.id = id;
this.zkw = zkw;
}
/**
* Get the cluster key of that peer
* @return string consisting of zk ensemble addresses, client port
* and root znode
*/
public String getClusterKey() {
return clusterKey;
}
/**
* Get the state of this peer
* @return atomic boolean that holds the status
*/
public AtomicBoolean getPeerEnabled() {
return peerEnabled;
}
/**
* Get a list of all the addresses of all the region servers
* for this peer cluster
* @return list of addresses
*/
public List<HServerAddress> getRegionServers() {
return regionServers;
}
/**
* Set the list of region servers for that peer
* @param regionServers list of addresses for the region servers
*/
public void setRegionServers(List<HServerAddress> regionServers) {
this.regionServers = regionServers;
}
/**
* Get the ZK connection to this peer
* @return zk connection
*/
public ZooKeeperWatcher getZkw() {
return zkw;
}
/**
* Get the identifier of this peer
* @return string representation of the id (short)
*/
public String getId() {
return id;
}
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
public Configuration getConfiguration() {
return conf;
}
}

View File

@ -54,7 +54,6 @@ import org.apache.zookeeper.KeeperException;
* <p/>
* <pre>
* replication/
* master {contains a full cluster address}
* state {contains true or false}
* clusterId {contains a byte}
* peers/
@ -80,8 +79,8 @@ public class ReplicationZookeeper {
private final static String RS_LOCK_ZNODE = "lock";
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of addresses of peer clusters with their ZKW
private Map<String, ZooKeeperWatcher> peerClusters;
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeer> peerClusters;
// Path to the root replication znode
private String replicationZNode;
// Path to the peer clusters znode
@ -92,16 +91,22 @@ public class ReplicationZookeeper {
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
// If this RS is part of a master cluster
private boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private String clusterId;
// The key to our own cluster
private String ourClusterKey;
// Abortable
private Abortable abortable;
/**
* Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use
* @param zk zk connection to use
* @throws IOException
*/
public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
throws KeeperException {
@ -125,22 +130,18 @@ public class ReplicationZookeeper {
this.conf = server.getConfiguration();
setZNodes();
this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
this.peerClusters = new HashMap<String, ReplicationPeer>();
this.replicating = replicating;
setReplicating();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
readReplicationStateZnode();
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
// Set a tracker on replicationStateNodeNode
ReplicationStatusTracker tracker =
new ReplicationStatusTracker(this.zookeeper, server);
tracker.start();
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
connectExistingPeers();
}
private void setZNodes() throws KeeperException {
@ -156,27 +157,44 @@ public class ReplicationZookeeper {
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
this.replicationZNode =
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String idResult = Bytes.toString(data);
this.clusterId = idResult == null?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
}
znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
data = ZKUtil.getData(this.zookeeper, znode);
String address = Bytes.toString(data);
this.replicationMaster = thisCluster.equals(address);
LOG.info("This cluster (" + thisCluster + ") is a " +
(this.replicationMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
private void connectExistingPeers() throws IOException, KeeperException {
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
}
/**
* List this cluster's peers' IDs
* @return list of all peers' identifiers
*/
public List<String> listPeersIdsAndWatch() {
List<String> ids = null;
try {
ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return ids;
}
/**
@ -185,16 +203,32 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
public List<HServerAddress> getPeersAddresses(String peerClusterId)
public List<HServerAddress> getSlavesAddresses(String peerClusterId)
throws KeeperException {
if (this.peerClusters.size() == 0) {
return new ArrayList<HServerAddress>(0);
}
ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
return zkw == null?
new ArrayList<HServerAddress>(0):
ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
ReplicationPeer peer = this.peerClusters.get(peerClusterId);
if (peer == null) {
return new ArrayList<HServerAddress>(0);
}
peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
return peer.getRegionServers();
}
/**
* Get the list of all the region servers from the specified peer
* @param zkw zk connection to use
* @return list of region server addresses
*/
private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
List<HServerAddress> rss = null;
try {
rss = ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
} catch (KeeperException e) {
LOG.warn("Cannot get peer's region server addresses", e);
}
return rss;
}
/**
@ -203,44 +237,146 @@ public class ReplicationZookeeper {
* @param peerId id of the peer cluster
* @throws KeeperException
*/
private void connectToPeer(String peerId) throws IOException, KeeperException {
public boolean connectToPeer(String peerId)
throws IOException, KeeperException {
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
// TODO remove when we support it
} else if (this.peerClusters.size() > 0) {
LOG.warn("Multiple slaves feature not supported");
return false;
}
ReplicationPeer peer = getPeer(peerId);
if (peer == null) {
return false;
}
this.peerClusters.put(peerId, peer);
ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + peer.getClusterKey());
return true;
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
* @throws IOException
* @throws KeeperException
*/
private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String [] ensemble = Bytes.toString(data).split(":");
if (ensemble.length != 3) {
throw new IllegalArgumentException("Wrong format of cluster address: " +
Bytes.toStringBinary(data));
String otherClusterKey = Bytes.toString(data);
if (this.ourClusterKey.equals(otherClusterKey)) {
LOG.debug("Not connecting to " + peerId + " because it's us");
return null;
}
String[] ensemble = otherClusterKey.split(":");
if (ensemble.length != 3) {
LOG.warn("Wrong format of cluster address: " +
Bytes.toStringBinary(data));
return null;
}
// Construct the connection to the new peer
Configuration otherConf = new Configuration(this.conf);
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
"connection to cluster: " + peerId, this.abortable);
this.peerClusters.put(peerId, zkw);
ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
return new ReplicationPeer(otherConf, peerId,
otherClusterKey, zkw);
}
/**
* Set the new replication state for this cluster
* @param newState
*/
public void setReplicating(boolean newState) throws IOException {
try {
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
ZKUtil.setData(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
Bytes.toBytes(Boolean.toString(newState)));
} catch (KeeperException e) {
throw new IOException("Unable to set the replication state", e);
}
}
/**
* Remove the peer from zookeeper. which will trigger the watchers on every
* region server and close their sources
* @param id
* @throws IllegalArgumentException Thrown when the peer doesn't exist
*/
public void removePeer(String id) throws IOException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
}
/**
* Add a new peer to this cluster
* @param id peer's identifier
* @param clusterKey ZK ensemble's addresses, client port and root znode
* @throws IllegalArgumentException Thrown when the peer doesn't exist
* @throws IllegalStateException Thrown when a peer already exists, since
* multi-slave isn't supported yet.
*/
public void addPeer(String id, String clusterKey) throws IOException {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add existing peer");
} else if (countPeers() > 0) {
throw new IllegalStateException("Multi-slave isn't supported yet");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
}
private boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
}
private int countPeers() throws KeeperException {
List<String> peers =
ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
return peers == null ? 0 : peers.size();
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
private void setReplicating() {
private void readReplicationStateZnode() {
try {
byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
String value = Bytes.toString(data);
if (value == null) LOG.info(getRepStateNode() + " data is null");
else {
this.replicating.set(Boolean.parseBoolean(value));
LOG.info("Replication is now " + (this.replicating.get()?
"started" : "stopped"));
}
this.replicating.set(getReplication());
LOG.info("Replication is now " + (this.replicating.get()?
"started" : "stopped"));
} catch (KeeperException e) {
this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
}
}
public boolean getReplication() throws KeeperException {
byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
return Boolean.parseBoolean(Bytes.toString(data));
}
private String getRepStateNode() {
return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
}
@ -303,13 +439,8 @@ public class ReplicationZookeeper {
public List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
List<ZKUtil.NodeAndData> nads =
ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode);
result = new ArrayList<String>(nads.size());
for (ZKUtil.NodeAndData nad : nads) {
String[] fullPath = nad.getNode().split("/");
result.add(fullPath[fullPath.length - 1]);
}
result = ZKUtil.listChildrenAndWatchThem(
this.zookeeper, this.zookeeper.rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
@ -442,10 +573,14 @@ public class ReplicationZookeeper {
* Delete a complete queue of hlogs
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
public void deleteSource(String peerZnode) {
public void deleteSource(String peerZnode, boolean closeConnection) {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
if (closeConnection) {
this.peerClusters.get(peerZnode).getZkw().close();
this.peerClusters.remove(peerZnode);
}
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + peerZnode, e);
}
@ -506,24 +641,39 @@ public class ReplicationZookeeper {
/**
* Get a map of all peer clusters
* @return map of peer cluster, zk address to ZKW
* @return map of peer cluster keyed by id
*/
public Map<String, ZooKeeperWatcher> getPeerClusters() {
public Map<String, ReplicationPeer> getPeerClusters() {
return this.peerClusters;
}
public String getRSZNode() {
return rsZNode;
/**
* Extracts the znode name of a peer cluster from a ZK path
* @param fullPath Path to extract the id from
* @return the id or an empty string if path is invalid
*/
public static String getZNodeName(String fullPath) {
String[] parts = fullPath.split("/");
return parts.length > 0 ? parts[parts.length-1] : "";
}
/**
*
* @return
* Get this cluster's zk connection
* @return zk connection
*/
public ZooKeeperWatcher getZookeeperWatcher() {
return this.zookeeper;
}
/**
* Get the full path to the peers' znode
* @return path to peers in zk
*/
public String getPeersZNode() {
return peersZNode;
}
/**
* Tracker for status of the replication
*/
@ -536,7 +686,7 @@ public class ReplicationZookeeper {
@Override
public synchronized void nodeDataChanged(String path) {
super.nodeDataChanged(path);
setReplicating();
readReplicationStateZnode();
}
}
}

View File

@ -126,6 +126,9 @@ public class ReplicationSource extends Thread
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
// If source is enabled, replication happens. If disabled, nothing will be
// replicated but HLogs will still be queued
private AtomicBoolean sourceEnabled = new AtomicBoolean();
/**
* Instantiation method used by region servers
@ -199,7 +202,7 @@ public class ReplicationSource extends Thread
private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
List<HServerAddress> addresses =
this.zkHelper.getPeersAddresses(peerClusterId);
this.zkHelper.getSlavesAddresses(peerClusterId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
@ -236,14 +239,20 @@ public class ReplicationSource extends Thread
this.position = this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName());
} catch (KeeperException e) {
LOG.error("Couldn't get the position of this recovered queue " +
this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
this.abort();
}
}
int sleepMultiplier = 1;
// Loop until we close down
while (!stopper.isStopped() && this.running) {
// Sleep until replication is enabled again
if (!this.replicating.get() || !this.sourceEnabled.get()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
// Get a new path
if (!getNextPath()) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
@ -419,7 +428,7 @@ public class ReplicationSource extends Thread
*/
protected boolean openReader(int sleepMultiplier) {
try {
LOG.info("Opening log for replication " + this.currentPath.getName() +
LOG.debug("Opening log for replication " + this.currentPath.getName() +
" at " + this.position);
try {
this.reader = null;
@ -445,6 +454,12 @@ public class ReplicationSource extends Thread
// TODO What happens if the log was missing from every single location?
// Although we need to check a couple of times as the log could have
// been moved by the master between the checks
// It can also happen if a recovered queue wasn't properly cleaned,
// such that the znode pointing to a log exists but the log was
// deleted a long time ago.
// For the moment, we'll throw the IO and processEndOfFile
throw new IOException("File from recovered queue is " +
"nowhere to be found", fnfe);
} else {
// If the log was archived, continue reading from there
Path archivedLogLocation =
@ -590,7 +605,7 @@ public class ReplicationSource extends Thread
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
this.abort();
this.terminate("Finished recovering the queue");
return true;
}
return false;
@ -601,25 +616,26 @@ public class ReplicationSource extends Thread
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
LOG.fatal("Set stop flag in " + t.getName(), e);
abort();
terminate("Uncaught exception during runtime", new Exception(e));
}
};
Threads.setDaemonThreadRunning(
this, n + ".replicationSource," + peerClusterZnode, handler);
}
/**
* Hastily stop the replication, then wait for shutdown
*/
private void abort() {
LOG.info("abort");
this.running = false;
terminate();
public void terminate(String reason) {
terminate(reason, null);
}
public void terminate() {
LOG.info("terminate");
public void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
} else {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
}
this.running = false;
Threads.shutdown(this, this.sleepForRetries);
}
@ -663,23 +679,22 @@ public class ReplicationSource extends Thread
return down;
}
/**
* Get the id that the source is replicating to
*
* @return peer cluster id
*/
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
/**
* Get the path of the current HLog
* @return current hlog's path
*/
public String getPeerClusterId() {
return this.peerClusterId;
}
public Path getCurrentPath() {
return this.currentPath;
}
public void setSourceEnabled(boolean status) {
this.sourceEnabled.set(status);
}
/**
* Comparator used to compare logs together based on their start time
*/

View File

@ -68,8 +68,16 @@ public interface ReplicationSourceInterface {
/**
* End the replication
* @param reason why it's terminating
*/
public void terminate();
public void terminate(String reason);
/**
* End the replication
* @param reason why it's terminating
* @param cause the error that's causing it
*/
public void terminate(String reason, Exception cause);
/**
* Get the id that the source is replicating to
@ -77,4 +85,17 @@ public interface ReplicationSourceInterface {
* @return peer cluster id
*/
public String getPeerClusterZnode();
/**
* Get the id that the source is replicating to.
*
* @return peer cluster id
*/
public String getPeerClusterId();
/**
* Set if this source is enabled or disabled
* @param status the new status
*/
public void setSourceEnabled(boolean status);
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This class is responsible to manage all the replication
@ -108,6 +109,9 @@ public class ReplicationSourceManager {
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
List<String> otherRSs =
this.zkHelper.getRegisteredRegionServers();
this.zkHelper.registerRegionServerListener(
new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
this.zkHelper.listPeersIdsAndWatch();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
}
@ -144,8 +148,7 @@ public class ReplicationSourceManager {
*/
public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters().keySet()) {
ReplicationSourceInterface src = addSource(id);
src.startup();
addSource(id);
}
List<String> currentReplicators = this.zkHelper.getRegisteredRegionServers();
if (currentReplicators == null || currentReplicators.size() == 0) {
@ -168,20 +171,24 @@ public class ReplicationSourceManager {
/**
* Add a new normal source to this region server
* @param id the id of the peer cluster
* @return the created source
* @return the source that was created
* @throws IOException
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
this.sources.add(src);
// TODO set it to what's in ZK
src.setSourceEnabled(true);
synchronized (this.hlogs) {
this.sources.add(src);
if (this.hlogs.size() > 0) {
this.zkHelper.addLogToList(this.hlogs.first(),
// Add the latest hlog to that source's queue
this.zkHelper.addLogToList(this.hlogs.last(),
this.sources.get(0).getPeerClusterZnode());
src.enqueueLog(this.latestPath);
}
}
src.startup();
return src;
}
@ -193,7 +200,7 @@ public class ReplicationSourceManager {
this.zkHelper.deleteOwnRSZNode();
}
for (ReplicationSourceInterface source : this.sources) {
source.terminate();
source.terminate("Region server is closing");
}
}
@ -214,6 +221,11 @@ public class ReplicationSourceManager {
}
void logRolled(Path newLog) {
if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log");
return;
}
if (this.sources.size() > 0) {
this.zkHelper.addLogToList(newLog.getName(),
this.sources.get(0).getPeerClusterZnode());
@ -300,10 +312,16 @@ public class ReplicationSourceManager {
try {
ReplicationSourceInterface src = getReplicationSource(this.conf,
this.fs, this, this.stopper, this.replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
this.oldsources.add(src);
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(this.oldLogDir, hlog));
}
// TODO set it to what's in ZK
src.setSourceEnabled(true);
src.startup();
} catch (IOException e) {
// TODO manage it
@ -319,7 +337,46 @@ public class ReplicationSourceManager {
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
this.zkHelper.deleteSource(src.getPeerClusterZnode());
this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
}
/**
* Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
* @param id The id of the peer cluster
*/
public void removePeer(String id) {
LOG.info("Closing the following queue " + id + ", currently have "
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
ReplicationSourceInterface srcToRemove = null;
List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
if (id.equals(src.getPeerClusterId())) {
oldSourcesToDelete.add(src);
}
}
for (ReplicationSourceInterface src : oldSourcesToDelete) {
closeRecoveredQueue((src));
}
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
for (ReplicationSourceInterface src : this.sources) {
if (id.equals(src.getPeerClusterId())) {
srcToRemove = src;
break;
}
}
if (srcToRemove == null) {
LOG.error("The queue we wanted to close is missing " + id);
return;
}
srcToRemove.terminate("Replication stream was removed by a user");
this.sources.remove(srcToRemove);
this.zkHelper.deleteSource(id, true);
}
/**
@ -354,8 +411,7 @@ public class ReplicationSourceManager {
return;
}
LOG.info(path + " znode expired, trying to lock it");
String[] rsZnodeParts = path.split("/");
transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
transferQueues(zkHelper.getZNodeName(path));
}
/**
@ -383,6 +439,70 @@ public class ReplicationSourceManager {
}
}
/**
* Watcher used to follow the creation and deletion of peer clusters.
*/
public class PeersWatcher extends ZooKeeperListener {
/**
* Construct a ZooKeeper event listener.
*/
public PeersWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
String id = zkHelper.getZNodeName(path);
removePeer(id);
}
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
List<String> peers = refreshPeersList(path);
if (peers == null) {
return;
}
for (String id : peers) {
try {
boolean added = zkHelper.connectToPeer(id);
if (added) {
addSource(id);
}
} catch (IOException e) {
// TODO manage better than that ?
LOG.error("Error while adding a new peer", e);
} catch (KeeperException e) {
LOG.error("Error while adding a new peer", e);
}
}
}
/**
* Verify if this event is meant for us, and if so then get the latest
* peers' list from ZK. Also reset the watches.
* @param path path to check against
* @return A list of peers' identifiers if the event concerns this watcher,
* else null.
*/
private List<String> refreshPeersList(String path) {
if (!path.startsWith(zkHelper.getPeersZNode())) {
return null;
}
return zkHelper.listPeersIdsAndWatch();
}
}
/**
* Get the directory where hlogs are archived
* @return the directory where hlogs are archived

View File

@ -275,6 +275,27 @@ public class ZKUtil {
}
}
/**
* List all the children of the specified znode, setting a watch for children
* changes and also setting a watch on every individual child in order to get
* the NodeCreated and NodeDeleted events.
* @param zkw zookeeper reference
* @param znode node to get children of and watch
* @return list of znode names, null if the node doesn't exist
* @throws KeeperException
*/
public static List<String> listChildrenAndWatchThem(ZooKeeperWatcher zkw,
String znode) throws KeeperException {
List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
if (children == null) {
return null;
}
for (String child : children) {
watchAndCheckExists(zkw, joinZNode(znode, child));
}
return children;
}
/**
* Lists the children of the specified znode, retrieving the data of each
* child as a server address.

View File

@ -47,7 +47,8 @@ features:
<li>Replication of scoped families in user tables.</li>
<li>Start/stop replication stream.</li>
<li>Supports clusters of different sizes.</li>
<li>Handling of partitions longer than 10 minutes</li>
<li>Handling of partitions longer than 10 minutes.</li>
<li>Ability to add/remove slave clusters at runtime.</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
@ -80,7 +81,7 @@ Before trying out replication, make sure to review the following requirements:
<p>
The following steps describe how to enable replication from a cluster
to another. This must be done with both clusters offlined.
to another.
<ol>
<li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
the following configurations:
@ -90,15 +91,13 @@ to another. This must be done with both clusters offlined.
&lt;value&gt;true&lt;/value&gt;
&lt;/property&gt;</pre>
</li>
<li>Run the following command on any cluster:
<pre>
$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/bin/replication/add_peer.tb</pre>
<li>Run the following command in the master's shell while it's running
<pre>add_peer</pre>
This will show you the help to setup the replication stream between
both clusters. If both clusters use the same Zookeeper cluster, you have
to use a different <b>zookeeper.znode.parent</b> since they can't
write in the same folder.
</li>
<li>You can now start and stop the clusters with your preferred method.</li>
</ol>
You can confirm that your setup works by looking at any region server's log
@ -115,12 +114,11 @@ was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
hbase(main):001:0> stop_replication</pre>
Where you replace the znode parent with the one configured on your master
cluster. Replication of already queued edits will still happen after you
Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
"false" with "true" in the command.
"false" with "true" in the command.
<p>

View File

@ -72,3 +72,4 @@ end
require 'hbase/hbase'
require 'hbase/admin'
require 'hbase/table'
require 'hbase/replication_admin'

View File

@ -49,5 +49,10 @@ module Hbase
def table(table, formatter)
::Hbase::Table.new(configuration, table, formatter)
end
def replication_admin(formatter)
::Hbase::RepAdmin.new(configuration, formatter)
end
end
end

View File

@ -0,0 +1,72 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
include Java
java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
module Hbase
class RepAdmin
include HBaseConstants
def initialize(configuration, formatter)
@replication_admin = ReplicationAdmin.new(configuration)
@formatter = formatter
end
#----------------------------------------------------------------------------------------------
# Add a new peer cluster to replicate to
def add_peer(id, cluster_key)
@replication_admin.addPeer(id, cluster_key)
end
#----------------------------------------------------------------------------------------------
# Remove a peer cluster, stops the replication
def remove_peer(id)
@replication_admin.removePeer(id)
end
#----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)
@replication_admin.enablePeer(id)
end
#----------------------------------------------------------------------------------------------
# Stop the replication stream to the specified peer
def disable_peer(id)
@replication_admin.disablePeer(id)
end
#----------------------------------------------------------------------------------------------
# Restart the replication, in an unknown state
def start_replication
@replication_admin.setReplicating(true)
end
#----------------------------------------------------------------------------------------------
# Kill switch for replication, stops all its features
def stop_replication
@replication_admin.setReplicating(false)
end
end
end

View File

@ -83,6 +83,10 @@ module Shell
hbase.table(name, formatter)
end
def hbase_replication_admin
@hbase_replication_admin ||= hbase.replication_admin(formatter)
end
def export_commands(where)
::Shell.commands.keys.each do |cmd|
where.send :instance_eval, <<-EOF
@ -254,3 +258,17 @@ Shell.load_command_group(
]
)
Shell.load_command_group(
'replication',
:full_name => 'CLUSTER REPLICATION TOOLS',
:comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported",
:commands => %w[
add_peer
remove_peer
enable_peer
disable_peer
start_replication
stop_replication
]
)

View File

@ -49,6 +49,10 @@ module Shell
shell.hbase_table(name)
end
def replication_admin
shell.hbase_replication_admin
end
#----------------------------------------------------------------------
def formatter

View File

@ -0,0 +1,44 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class AddPeer< Command
def help
return <<-EOF
Add a peer cluster to replicate to, the id must be a short and
the cluster key is composed like this:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
This gives a full path for HBase to connect to another cluster.
Examples:
hbase> add_peer '1', "server1.cie.com:2181:/hbase"
hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
EOF
end
def command(id, cluster_key)
format_simple_command do
replication_admin.add_peer(id, cluster_key)
end
end
end
end
end

View File

@ -0,0 +1,44 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class DisablePeer< Command
def help
return <<-EOF
Stops the replication stream to the specified cluster, but still
keeps track of new edits to replicate.
CURRENTLY UNSUPPORTED
Examples:
hbase> disable_peer '1'
EOF
end
def command(id)
format_simple_command do
replication_admin.disable_peer(id)
end
end
end
end
end

View File

@ -0,0 +1,44 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class EnablePeer< Command
def help
return <<-EOF
Restarts the replication to the specified peer cluster,
continuing from where it was disabled.
CURRENTLY UNSUPPORTED
Examples:
hbase> enable_peer '1'
EOF
end
def command(id)
format_simple_command do
replication_admin.enable_peer(id)
end
end
end
end
end

View File

@ -0,0 +1,41 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class RemovePeer< Command
def help
return <<-EOF
Stops the specified replication stream and deletes all the meta
information kept about it.
Examples:
hbase> remove_peer '1'
EOF
end
def command(id)
format_simple_command do
replication_admin.remove_peer(id)
end
end
end
end
end

View File

@ -0,0 +1,43 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class StartReplication < Command
def help
return <<-EOF
Restarts all the replication features. The state in which each
stream starts in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> start_replication
EOF
end
def command
format_simple_command do
replication_admin.start_replication
end
end
end
end
end

View File

@ -0,0 +1,43 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class StopReplication < Command
def help
return <<-EOF
Stops all the replication features. The state in which each
stream stops in is undetermined.
WARNING:
start/stop replication is only meant to be used in critical load situations.
Examples:
hbase> stop_replication
EOF
end
def command
format_simple_command do
replication_admin.stop_replication
end
end
end
end
end

View File

@ -31,7 +31,7 @@
<p>
HBase replication is a way to copy data between HBase deployments. It
can serve as a disaster recovery solution and can contribute to provide
higher availability at HBase layer. It can also serve more practically;
higher availability at the HBase layer. It can also serve more practically;
for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
cluster which will process old and new data and ship back the results
automatically.
@ -97,10 +97,10 @@
</p>
<p>
Synchronously, the region server that receives the edits reads them
sequentially and applies them on its own cluster using the HBase client
(HTables managed by a HTablePool) automatically. If consecutive rows
belong to the same table, they are inserted together in order to
leverage parallel insertions.
sequentially and separates each of them into buffers, one per table.
Once all edits are read, each buffer is flushed using the normal HBase
client (HTables managed by a HTablePool). This is done in order to
leverage parallel insertion (MultiPut).
</p>
<p>
Back in the master cluster's region server, the offset for the current
@ -221,23 +221,6 @@
10 times until trying to find a different sink.
</p>
</section>
<section name="Applying edits">
<p>
The sink synchronously applies the edits to its local cluster using
the native client API. This method is also synchronized between every
incoming sources, which means that only one batch of log entries can be
replicated at a time by each slave region server.
</p>
<p>
The sink applies the edits one by one, unless it's able to batch
sequential Puts that belong to the same table in order to use the
parallel puts feature of HConnectionManager. The Put and Delete objects
are recreated by inspecting the incoming WALEdit objects and are
with the exact same row, family, qualifier, timestamp, and value (for
Put). Note that if the master and slave cluster don't have the same
time, time-related issues may occur.
</p>
</section>
<section name="Cleaning logs">
<p>
If replication isn't enabled, the master's logs cleaning thread will
@ -401,10 +384,6 @@
features in the replication implementation.
</p>
<ol>
<li>
HBASE-2688, master-master replication is disabled in the code, we need
to enable and test it.
</li>
<li>
HBASE-2611, basically if a region server dies while recovering the
queues of another dead RS, we will miss the data from the queues
@ -419,9 +398,8 @@
carry that data and check it.
</li>
<li>
HBASE-2200, currently all the replication operations (adding or removing
streams for example) are done only when the clusters are offline. This
should be possible at runtime.
HBASE-3130, the master cluster needs to be restarted if its region
servers lose their session with a slave cluster.
</li>
</ol>
</section>

View File

@ -0,0 +1,92 @@
package org.apache.hadoop.hbase.client.replication;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
/**
* Unit testing of ReplicationAdmin
*/
public class TestReplicationAdmin {
private static final Log LOG =
LogFactory.getLog(TestReplicationAdmin.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
private final String ID_SECOND = "2";
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
private static ReplicationSourceManager manager;
private static ReplicationAdmin admin;
private static AtomicBoolean replicating = new AtomicBoolean(true);
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
admin = new ReplicationAdmin(conf);
Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(TEST_UTIL.getTestDir(),
HConstants.HREGION_LOGDIR_NAME);
manager = new ReplicationSourceManager(admin.getReplicationZk(),
conf, null, FileSystem.get(conf), replicating, logDir, oldLogDir);
}
/**
* Simple testing of adding and removing peers, basically shows that
* all interactions with ZK work
* @throws Exception
*/
@Test
public void testAddRemovePeer() throws Exception {
assertEquals(0, manager.getSources().size());
// Add a valid peer
admin.addPeer(ID_ONE, KEY_ONE);
// try adding the same (fails)
try {
admin.addPeer(ID_ONE, KEY_ONE);
} catch (IllegalArgumentException iae) {
// OK!
}
assertEquals(1, admin.getPeersCount());
// Try to remove an inexisting peer
try {
admin.removePeer(ID_SECOND);
fail();
} catch (IllegalArgumentException iae) {
// OK!
}
assertEquals(1, admin.getPeersCount());
// Add a second, returns illegal since multi-slave isn't supported
try {
admin.addPeer(ID_SECOND, KEY_SECOND);
fail();
} catch (IllegalStateException iae) {
// OK!
}
assertEquals(1, admin.getPeersCount());
// Remove the first peer we added
admin.removePeer(ID_ONE);
assertEquals(0, admin.getPeersCount());
}
}

View File

@ -63,7 +63,12 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
public void terminate() {
public void terminate(String reason) {
}
@Override
public void terminate(String reason, Exception e) {
}
@ -71,4 +76,15 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getPeerClusterZnode() {
return peerClusterId;
}
@Override
public String getPeerClusterId() {
return peerClusterId;
}
@Override
public void setSourceEnabled(boolean status) {
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -59,6 +60,9 @@ public class TestReplication {
private static ZooKeeperWatcher zkw1;
private static ZooKeeperWatcher zkw2;
private static ReplicationAdmin admin;
private static String slaveClusterKey;
private static HTable htable1;
private static HTable htable2;
@ -92,16 +96,12 @@ public class TestReplication {
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
ZKUtil.createWithParents(zkw1, "/1/replication/master");
ZKUtil.createWithParents(zkw1, "/1/replication/state");
ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
setIsReplication(true);
admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
@ -114,12 +114,11 @@ public class TestReplication {
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
ZKUtil.createWithParents(zkw2, "/2/replication");
ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2";
admin.addPeer("2", slaveClusterKey);
setIsReplication(true);
LOG.info("Setup second Zk");
@ -145,9 +144,7 @@ public class TestReplication {
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
ZKUtil.setData(zkw1,"/1/replication/state",
Bytes.toBytes(Boolean.toString(rep)));
// Takes some ms for ZK to fire the watcher
admin.setReplicating(rep);
Thread.sleep(SLEEP_TIME);
}
@ -156,12 +153,31 @@ public class TestReplication {
*/
@Before
public void setUp() throws Exception {
setIsReplication(false);
utility1.truncateTable(tableName);
utility2.truncateTable(tableName);
// If test is flaky, set that sleep higher
Thread.sleep(SLEEP_TIME*8);
setIsReplication(true);
// truncating the table will send on Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call truncateTable on
// utility2 since late writes could make it to the slave in some way.
// Instead, we truncate the first table and wait for all the Deletes to
// make it to the slave.
Scan scan = new Scan();
int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for truncate");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
scanner.close();
if (res.length != 0) {
if (lastCount < res.length) {
i--; // Don't increment timeout if we make progress
}
LOG.info("Still got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
/**
@ -186,13 +202,12 @@ public class TestReplication {
htable1 = new HTable(conf1, tableName);
htable1.put(put);
HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
@ -205,13 +220,12 @@ public class TestReplication {
Delete del = new Delete(row);
htable1.delete(del);
table2 = new HTable(conf2, tableName);
get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = table2.get(get);
Result res = htable2.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
@ -333,6 +347,59 @@ public class TestReplication {
}
/**
* Integration test for TestReplicationAdmin, removes and re-add a peer
* cluster
* @throws Exception
*/
@Test
public void testAddAndRemoveClusters() throws Exception {
LOG.info("testAddAndRemoveClusters");
admin.removePeer("2");
Thread.sleep(SLEEP_TIME);
byte[] rowKey = Bytes.toBytes("Won't be replicated");
Put put = new Put(rowKey);
put.add(famName, row, row);
htable1.put(put);
Get get = new Get(rowKey);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES-1) {
break;
}
Result res = htable2.get(get);
if (res.size() >= 1) {
fail("Not supposed to be replicated");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
admin.addPeer("2", slaveClusterKey);
Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep");
put = new Put(rowKey);
put.add(famName, row, row);
LOG.info("Adding new row");
htable1.put(put);
get = new Get(rowKey);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME*i);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
}
/**
* Do a more intense version testSmallBatch, one that will trigger
* hlog rolling and other non-trivial code paths

View File

@ -64,8 +64,6 @@ public class TestReplicationSourceManager {
private static HBaseTestingUtility utility;
private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
private static Replication replication;
private static ReplicationSourceManager manager;
@ -105,14 +103,12 @@ public class TestReplicationSourceManager {
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
ZKUtil.createWithParents(zkw, "/hbase/replication/master");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
ZKUtil.setData(zkw, "/hbase/replication/master",
Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1"));
ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();