HBASE-3060 [replication] Reenable replication on trunk with unit tests

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1021447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-10-11 18:00:48 +00:00
parent 999cb8af7c
commit 5ccf27683b
12 changed files with 298 additions and 240 deletions

View File

@ -580,7 +580,7 @@ Release 0.21.0 - Unreleased
HBASE-3080 TestAdmin hanging on hudson
HBASE-3063 TestThriftServer failing in TRUNK
HBASE-3094 Fixes for miscellaneous broken tests
HBASE-3060 [replication] Reenable replication on trunk with unit tests
IMPROVEMENTS

View File

@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
/**
* This class serves as a helper for all things related to zookeeper
@ -81,31 +81,39 @@ public class ReplicationZookeeper {
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of addresses of peer clusters with their ZKW
private final Map<String, ReplicationZookeeper> peerClusters;
private Map<String, ZooKeeperWatcher> peerClusters;
// Path to the root replication znode
private final String replicationZNode;
private String replicationZNode;
// Path to the peer clusters znode
private final String peersZNode;
private String peersZNode;
// Path to the znode that contains all RS that replicates
private final String rsZNode;
private String rsZNode;
// Path to this region server's name under rsZNode
private final String rsServerNameZnode;
private String rsServerNameZnode;
// Name node if the replicationState znode
private final String replicationStateNodeName;
private String replicationStateNodeName;
// If this RS is part of a master cluster
private final boolean replicationMaster;
private boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
private final AtomicBoolean replicating;
private AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private final String clusterId;
private String clusterId;
// Abortable
private final Abortable abortable;
private Abortable abortable;
public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
throws KeeperException {
this.conf = conf;
this.zookeeper = zk;
setZNodes();
}
/**
* Constructor used by region servers, connects to the peer cluster right away.
*
* @param zookeeper
* @param server
* @param replicating atomic boolean to start/stop replication
* @throws IOException
* @throws KeeperException
@ -115,6 +123,27 @@ public class ReplicationZookeeper {
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
setZNodes();
this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
this.replicating = replicating;
setReplicating();
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
// Set a tracker on replicationStateNodeNode
ReplicationStatusTracker tracker =
new ReplicationStatusTracker(this.zookeeper, server);
tracker.start();
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
}
private void setZNodes() throws KeeperException {
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
@ -130,15 +159,11 @@ public class ReplicationZookeeper {
String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
this.peerClusters = new HashMap<String, ReplicationZookeeper>();
this.replicationZNode =
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
this.replicating = replicating;
setReplicating();
String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String idResult = Bytes.toString(data);
@ -152,24 +177,6 @@ public class ReplicationZookeeper {
LOG.info("This cluster (" + thisCluster + ") is a " +
(this.replicationMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
if (server.getServerName() != null) {
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
// Set a tracker on replicationStateNodeNode
ReplicationStatusTracker tracker =
new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server);
tracker.start();
List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
}
}
} else {
this.rsServerNameZnode = null;
}
}
/**
@ -178,47 +185,16 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
public List<HServerAddress> getPeersAddresses(String peerClusterId) {
public List<HServerAddress> getPeersAddresses(String peerClusterId)
throws KeeperException {
if (this.peerClusters.size() == 0) {
return new ArrayList<HServerAddress>(0);
}
ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId);
ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
return zkw == null?
new ArrayList<HServerAddress>(0):
zkw.scanAddressDirectory(this.zookeeper.rsZNode);
}
/**
* Scan a directory of address data.
* @param znode The parent node
* @return The directory contents as HServerAddresses
*/
public List<HServerAddress> scanAddressDirectory(String znode) {
List<HServerAddress> list = new ArrayList<HServerAddress>();
List<String> nodes = null;
try {
nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Scanning " + znode, e);
}
if (nodes == null) {
return list;
}
for (String node : nodes) {
String path = ZKUtil.joinZNode(znode, node);
list.add(readAddress(path));
}
return list;
}
private HServerAddress readAddress(String znode) {
byte [] data = null;
try {
data = ZKUtil.getData(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Getting address", e);
}
return new HServerAddress(Bytes.toString(data));
ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
}
/**
@ -239,15 +215,11 @@ public class ReplicationZookeeper {
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
// REENABLE -- FIX!!!!
/*
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
"connection to cluster: " + peerId);
zkw.registerListener(new ReplicationStatusWatcher());
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
"connection to cluster: " + peerId, this.abortable);
this.peerClusters.put(peerId, zkw);
this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
*/
LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
}
@ -282,7 +254,7 @@ public class ReplicationZookeeper {
try {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes(""));
ZKUtil.createWithParents(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed add log to list", e);
}
@ -297,7 +269,7 @@ public class ReplicationZookeeper {
try {
String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.deleteChildrenRecursively(this.zookeeper, znode);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed remove from list", e);
}
@ -316,7 +288,7 @@ public class ReplicationZookeeper {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and note Long as bytes?
ZKUtil.createAndWatch(this.zookeeper, znode,
ZKUtil.setData(this.zookeeper, znode,
Bytes.toBytes(Long.toString(position)));
} catch (KeeperException e) {
this.abortable.abort("Writing replication status", e);
@ -326,15 +298,18 @@ public class ReplicationZookeeper {
/**
* Get a list of all the other region servers in this cluster
* and set a watch
* @param watch the watch to set
* @return a list of server nanes
*/
public List<String> getRegisteredRegionServers(Watcher watch) {
public List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
// TODO: This is rsZNode from zk which is like getListOfReplicators
// but maybe these are from different zk instances?
result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
List<ZKUtil.NodeAndData> nads =
ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode);
result = new ArrayList<String>(nads.size());
for (ZKUtil.NodeAndData nad : nads) {
String[] fullPath = nad.getNode().split("/");
result.add(fullPath[fullPath.length - 1]);
}
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
@ -344,7 +319,6 @@ public class ReplicationZookeeper {
/**
* Get the list of the replicators that have queues, they can be alive, dead
* or simply from a previous run
* @param watch the watche to set
* @return a list of server names
*/
public List<String> getListOfReplicators() {
@ -360,7 +334,6 @@ public class ReplicationZookeeper {
/**
* Get the list of peer clusters for the specified server names
* @param rs server names of the rs
* @param watch the watch to set
* @return a list of peer cluster
*/
public List<String> getListPeersForRS(String rs) {
@ -378,7 +351,6 @@ public class ReplicationZookeeper {
* Get the list of hlogs for the specified region server and peer cluster
* @param rs server names of the rs
* @param id peer cluster
* @param watch the watch to set
* @return a list of hlogs
*/
public List<String> getListHLogsForPeerForRS(String rs, String id) {
@ -401,10 +373,14 @@ public class ReplicationZookeeper {
public boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.rsZNode, znode);
if (parent.equals(rsServerNameZnode)) {
LOG.warn("Won't lock because this is us, we're dead!");
return false;
}
String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
} catch (KeeperException e) {
this.abortable.abort("Failed lock other rs", e);
LOG.info("Failed lock other rs", e);
}
return true;
}
@ -468,7 +444,7 @@ public class ReplicationZookeeper {
*/
public void deleteSource(String peerZnode) {
try {
ZKUtil.deleteChildrenRecursively(this.zookeeper,
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + peerZnode, e);
@ -481,7 +457,7 @@ public class ReplicationZookeeper {
*/
public void deleteRsQueues(String znode) {
try {
ZKUtil.deleteChildrenRecursively(this.zookeeper,
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsZNode, znode));
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + znode, e);
@ -492,7 +468,12 @@ public class ReplicationZookeeper {
* Delete this cluster's queues
*/
public void deleteOwnRSZNode() {
deleteRsQueues(this.rsServerNameZnode);
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
this.rsServerNameZnode);
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
}
}
/**
@ -510,13 +491,8 @@ public class ReplicationZookeeper {
return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
}
/**
* Tells if this cluster replicates or not
*
* @return if this is a master
*/
public boolean isReplicationMaster() {
return this.replicationMaster;
public void registerRegionServerListener(ZooKeeperListener listener) {
this.zookeeper.registerListener(listener);
}
/**
@ -532,17 +508,29 @@ public class ReplicationZookeeper {
* Get a map of all peer clusters
* @return map of peer cluster, zk address to ZKW
*/
public Map<String, ReplicationZookeeper> getPeerClusters() {
public Map<String, ZooKeeperWatcher> getPeerClusters() {
return this.peerClusters;
}
public String getRSZNode() {
return rsZNode;
}
/**
*
* @return
*/
public ZooKeeperWatcher getZookeeperWatcher() {
return this.zookeeper;
}
/**
* Tracker for status of the replication
*/
public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node,
public ReplicationStatusTracker(ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, node, abortable);
super(watcher, getRepStateNode(), abortable);
}
@Override

View File

@ -23,24 +23,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
*/
public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
public class ReplicationLogCleaner implements LogCleanerDelegate {
private static final Log LOG =
LogFactory.getLog(ReplicationLogCleaner.class);
@ -78,31 +75,30 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
private boolean refreshHLogsAndSearch(String searchedLog) {
this.hlogs.clear();
final boolean lookForLog = searchedLog != null;
// REENALBE
// List<String> rss = zkHelper.getListOfReplicators(this);
// if (rss == null) {
// LOG.debug("Didn't find any region server that replicates, deleting: " +
// searchedLog);
// return false;
// }
// for (String rs: rss) {
// List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
// // if rs just died, this will be null
// if (listOfPeers == null) {
// continue;
// }
// for (String id : listOfPeers) {
// List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
// if (peersHlogs != null) {
// this.hlogs.addAll(peersHlogs);
// }
// // early exit if we found the log
// if(lookForLog && this.hlogs.contains(searchedLog)) {
// LOG.debug("Found log in ZK, keeping: " + searchedLog);
// return true;
// }
// }
// }
List<String> rss = zkHelper.getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, deleting: " +
searchedLog);
return false;
}
for (String rs: rss) {
List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
if (peersHlogs != null) {
this.hlogs.addAll(peersHlogs);
}
// early exit if we found the log
if(lookForLog && this.hlogs.contains(searchedLog)) {
LOG.debug("Found log in ZK, keeping: " + searchedLog);
return true;
}
}
}
LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
return false;
}
@ -110,15 +106,15 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
// try {
// REENABLE
// this.zkHelper = new ReplicationZookeeperWrapper(
// ZooKeeperWrapper.createInstance(this.conf,
// HMaster.class.getName()),
// this.conf, new AtomicBoolean(true), null);
// } catch (IOException e) {
// LOG.error(e);
// }
try {
ZooKeeperWatcher zkw =
new ZooKeeperWatcher(conf, this.getClass().getName(), null);
this.zkHelper = new ReplicationZookeeper(conf, zkw);
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
refreshHLogsAndSearch(null);
}
@ -126,7 +122,4 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
public Configuration getConf() {
return conf;
}
@Override
public void process(WatchedEvent watchedEvent) {}
}

View File

@ -45,7 +45,6 @@ import org.apache.zookeeper.KeeperException;
public class Replication implements WALObserver {
private final boolean replication;
private final ReplicationSourceManager replicationManager;
private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private final ReplicationZookeeper zkHelper;
private final Configuration conf;
@ -70,10 +69,8 @@ public class Replication implements WALObserver {
this.replication = isReplication(this.conf);
if (replication) {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.replicationMaster = zkHelper.isReplicationMaster();
this.replicationManager = this.replicationMaster ?
new ReplicationSourceManager(zkHelper, conf, this.server,
fs, this.replicating, logDir, oldLogDir) : null;
this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
this.server, fs, this.replicating, logDir, oldLogDir) ;
} else {
this.replicationManager = null;
this.zkHelper = null;
@ -93,10 +90,8 @@ public class Replication implements WALObserver {
*/
public void join() {
if (this.replication) {
if (this.replicationMaster) {
this.replicationManager.join();
}
this.zkHelper.deleteOwnRSZNode();
this.replicationManager.join();
this.zkHelper.deleteOwnRSZNode();
}
}
@ -106,7 +101,7 @@ public class Replication implements WALObserver {
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
if (this.replication && !this.replicationMaster) {
if (this.replication) {
this.replicationSink.replicateEntries(entries);
}
}
@ -118,11 +113,8 @@ public class Replication implements WALObserver {
*/
public void startReplicationServices() throws IOException {
if (this.replication) {
if (this.replicationMaster) {
this.replicationManager.init();
} else {
this.replicationSink = new ReplicationSink(this.conf, this.server);
}
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
}
}

View File

@ -38,7 +38,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is responsible for replicating the edits coming

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
/**
* Class that handles the source of a replication stream.
@ -195,7 +196,7 @@ public class ReplicationSource extends Thread
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() {
private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
List<HServerAddress> addresses =
this.zkHelper.getPeersAddresses(peerClusterId);
@ -231,8 +232,14 @@ public class ReplicationSource extends Thread
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
// this.position = this.zkHelper.getHLogRepPosition(
// this.peerClusterZnode, this.queue.peek().getName());
try {
this.position = this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName());
} catch (KeeperException e) {
LOG.error("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
this.abort();
}
}
int sleepMultiplier = 1;
// Loop until we close down
@ -380,6 +387,8 @@ public class ReplicationSource extends Thread
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
LOG.error("Interrupted while trying to connect to sinks", e);
} catch (KeeperException e) {
LOG.error("Error talking to zookeeper, retrying", e);
}
}
}
@ -553,6 +562,8 @@ public class ReplicationSource extends Thread
} while (!this.stopper.isStopped() && down);
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster");
} catch (KeeperException e) {
LOG.error("Error talking to zookeeper, retrying", e);
}
}
@ -589,7 +600,7 @@ public class ReplicationSource extends Thread
}
};
Threads.setDaemonThreadRunning(
this, n + ".replicationSource," + clusterId, handler);
this, n + ".replicationSource," + peerClusterZnode, handler);
}
/**

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* This class is responsible to manage all the replication
@ -104,8 +104,10 @@ public class ReplicationSourceManager {
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
this.zkHelper.registerRegionServerListener(
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
List<String> otherRSs =
this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
this.zkHelper.getRegisteredRegionServers();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
}
@ -145,7 +147,10 @@ public class ReplicationSourceManager {
ReplicationSourceInterface src = addSource(id);
src.startup();
}
List<String> currentReplicators = this.zkHelper.getListOfReplicators();
List<String> currentReplicators = this.zkHelper.getRegisteredRegionServers();
if (currentReplicators == null || currentReplicators.size() == 0) {
return;
}
synchronized (otherRegionServers) {
LOG.info("Current list of replicators: " + currentReplicators
+ " other RSs: " + otherRegionServers);
@ -322,29 +327,59 @@ public class ReplicationSourceManager {
* in the local cluster. It initiates the process to transfer the queues
* if it is able to grab the lock.
*/
public class OtherRegionServerWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info(" event " + watchedEvent);
if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
public class OtherRegionServerWatcher extends ZooKeeperListener {
/**
* Construct a ZooKeeper event listener.
*/
public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
super(watcher);
}
/**
* Called when a new node has been created.
* @param path full path of the new node
*/
public void nodeCreated(String path) {
refreshRegionServersList(path);
}
/**
* Called when a node has been deleted
* @param path full path of the deleted node
*/
public void nodeDeleted(String path) {
boolean cont = refreshRegionServersList(path);
if (!cont) {
return;
}
LOG.info(path + " znode expired, trying to lock it");
String[] rsZnodeParts = path.split("/");
transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
}
List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
/**
* Called when an existing node has a child node added or removed.
* @param path full path of the node whose children have changed
*/
public void nodeChildrenChanged(String path) {
refreshRegionServersList(path);
}
private boolean refreshRegionServersList(String path) {
if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
return false;
}
List<String> newRsList = (zkHelper.getRegisteredRegionServers());
if (newRsList == null) {
return;
return false;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
}
}
if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
String[] rsZnodeParts = watchedEvent.getPath().split("/");
transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
}
return true;
}
}

View File

@ -916,7 +916,7 @@ public class ZKUtil {
public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
if(!children.isEmpty()) {
if(children != null || !children.isEmpty()) {
for(String child : children) {
deleteNodeRecursively(zkw, joinZNode(node, child));
}

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -54,10 +56,9 @@ public class TestReplication {
private static Configuration conf1;
private static Configuration conf2;
/*
private static ZooKeeperWrapper zkw1;
private static ZooKeeperWrapper zkw2;
*/
private static ZooKeeperWatcher zkw1;
private static ZooKeeperWatcher zkw2;
private static HTable htable1;
private static HTable htable2;
@ -92,15 +93,15 @@ public class TestReplication {
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
/* REENALBE
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
zkw1.writeZNode("/1", "replication", "");
zkw1.writeZNode("/1/replication", "master",
zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
ZKUtil.createWithParents(zkw1, "/1/replication/master");
ZKUtil.createWithParents(zkw1, "/1/replication/state");
ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
setIsReplication(true);
LOG.info("Setup first Zk");
@ -113,15 +114,13 @@ public class TestReplication {
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
zkw2.writeZNode("/2", "replication", "");
zkw2.writeZNode("/2/replication", "master",
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
ZKUtil.createWithParents(zkw2, "/2/replication");
zkw1.writeZNode("/1/replication/peers", "1",
ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2");
conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
LOG.info("Setup second Zk");
@ -143,12 +142,12 @@ public class TestReplication {
htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
htable2 = new HTable(conf2, tableName);
*/
}
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
// REENALBE zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
ZKUtil.setData(zkw1,"/1/replication/state",
Bytes.toBytes(Boolean.toString(rep)));
// Takes some ms for ZK to fire the watcher
Thread.sleep(SLEEP_TIME);
}
@ -181,7 +180,7 @@ public class TestReplication {
* Add a row, check it's replicated, delete it, check's gone
* @throws Exception
*/
@Ignore @Test
@Test
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
Put put = new Put(row);
@ -229,7 +228,7 @@ public class TestReplication {
* Try a small batch upload using the write buffer, check it's replicated
* @throws Exception
*/
@Ignore @Test
@Test
public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch");
Put put;
@ -273,7 +272,7 @@ public class TestReplication {
* replicated, enable it, try replicating and it should work
* @throws Exception
*/
@Ignore @Test
@Test
public void testStartStop() throws Exception {
// Test stopping replication
@ -342,7 +341,7 @@ public class TestReplication {
* hlog rolling and other non-trivial code paths
* @throws Exception
*/
@Ignore @Test
@Test
public void loadTesting() throws Exception {
htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false);
@ -396,7 +395,7 @@ public class TestReplication {
* the upload. The failover happens internally.
* @throws Exception
*/
@Ignore @Test
@Test
public void queueFailover() throws Exception {
utility1.createMultiRegions(htable1, famName);

View File

@ -69,7 +69,7 @@ public class TestReplicationSource {
* time reading logs that are being archived.
* @throws Exception
*/
@Ignore @Test
@Test
public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log");
HLog.Writer writer = HLog.createWriter(fs, logPath, conf);

View File

@ -47,10 +47,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class DISABLEDTestReplicationSink {
public class TestReplicationSink {
private static final Log LOG =
LogFactory.getLog(DISABLEDTestReplicationSink.class);
LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
@ -128,7 +128,7 @@ public class DISABLEDTestReplicationSink {
* Insert a whole batch of entries
* @throws Exception
*/
@Ignore @Test
@Test
public void testBatchSink() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
@ -144,7 +144,7 @@ public class DISABLEDTestReplicationSink {
* Insert a mix of puts and deletes
* @throws Exception
*/
@Ignore @Test
@Test
public void testMixedPutDelete() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
for(int i = 0; i < BATCH_SIZE/2; i++) {
@ -168,7 +168,7 @@ public class DISABLEDTestReplicationSink {
* Insert to 2 different tables
* @throws Exception
*/
@Ignore @Test
@Test
public void testMixedPutTables() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
@ -189,7 +189,7 @@ public class DISABLEDTestReplicationSink {
* Insert then do different types of deletes
* @throws Exception
*/
@Ignore @Test
@Test
public void testMixedDeletes() throws Exception {
HLog.Entry[] entries = new HLog.Entry[3];
for(int i = 0; i < 3; i++) {
@ -214,7 +214,7 @@ public class DISABLEDTestReplicationSink {
* before the actual Put that creates it.
* @throws Exception
*/
@Ignore @Test
@Test
public void testApplyDeleteBeforePut() throws Exception {
HLog.Entry[] entries = new HLog.Entry[5];
for(int i = 0; i < 2; i++) {

View File

@ -31,15 +31,16 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -54,10 +55,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
public class DISABLEDTestReplicationSourceManager {
public class TestReplicationSourceManager {
private static final Log LOG =
LogFactory.getLog(DISABLEDTestReplicationSourceManager.class);
LogFactory.getLog(TestReplicationSourceManager.class);
private static Configuration conf;
@ -65,9 +66,11 @@ public class DISABLEDTestReplicationSourceManager {
private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
private static Replication replication;
private static ReplicationSourceManager manager;
// REENALBE private static ZooKeeperWrapper zkw;
private static ZooKeeperWatcher zkw;
private static HTableDescriptor htd;
@ -100,26 +103,25 @@ public class DISABLEDTestReplicationSourceManager {
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
// REENABLE
// zkw = ZooKeeperWrapper.createInstance(conf, "test");
// zkw.writeZNode("/hbase", "replication", "");
// zkw.writeZNode("/hbase/replication", "master",
// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
// conf.get("hbase.zookeeper.property.clientPort")+":/1");
// zkw.writeZNode("/hbase/replication/peers", "1",
// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
// conf.get("hbase.zookeeper.property.clientPort")+":/1");
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
ZKUtil.createWithParents(zkw, "/hbase/replication/master");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
ZKUtil.setData(zkw, "/hbase/replication/master",
Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1"));
ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1"));
HRegionServer server = new HRegionServer(conf);
ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getTestDir(),
HConstants.HREGION_LOGDIR_NAME);
manager = new ReplicationSourceManager(helper,
conf, server, fs, REPLICATING, logDir, oldLogDir);
manager.addSource("1");
htd = new HTableDescriptor(test);
@ -137,7 +139,7 @@ public class DISABLEDTestReplicationSourceManager {
@AfterClass
public static void tearDownAfterClass() throws Exception {
// REENABLE manager.join();
manager.join();
utility.shutdownMiniCluster();
}
@ -152,7 +154,7 @@ public class DISABLEDTestReplicationSourceManager {
setUp();
}
@Ignore @Test
@Test
public void testLogRoll() throws Exception {
long seq = 0;
long baseline = 1000;
@ -160,8 +162,9 @@ public class DISABLEDTestReplicationSourceManager {
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
List<WALObserver> listeners = new ArrayList<WALObserver>();
// REENABLE listeners.add(manager);
listeners.add(replication);
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
@ -195,17 +198,55 @@ public class DISABLEDTestReplicationSourceManager {
hlog.rollWriter();
// REENABLE manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
// REENABLE "1", 0, false);
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false);
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
hlog.append(hri, key, edit);
// REENABLE assertEquals(1, manager.getHLogs().size());
assertEquals(1, manager.getHLogs().size());
// TODO Need a case with only 2 HLogs and we only want to delete the first one
}
static class DummyServer implements Server {
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return zkw;
}
@Override
public CatalogTracker getCatalogTracker() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public String getServerName() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void abort(String why, Throwable e) {
//To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void stop(String why) {
//To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean isStopped() {
return false; //To change body of implemented methods use File | Settings | File Templates.
}
}
}