From 5ccf27683b6411dab04eabf02b135ac1e03d4f1b Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Mon, 11 Oct 2010 18:00:48 +0000 Subject: [PATCH] HBASE-3060 [replication] Reenable replication on trunk with unit tests git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1021447 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 +- .../replication/ReplicationZookeeper.java | 192 ++++++++---------- .../master/ReplicationLogCleaner.java | 79 ++++--- .../replication/regionserver/Replication.java | 22 +- .../regionserver/ReplicationSink.java | 1 - .../regionserver/ReplicationSource.java | 19 +- .../ReplicationSourceManager.java | 69 +++++-- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 2 +- .../hbase/replication/TestReplication.java | 45 ++-- .../replication/TestReplicationSource.java | 2 +- ...tionSink.java => TestReplicationSink.java} | 14 +- ...java => TestReplicationSourceManager.java} | 91 ++++++--- 12 files changed, 298 insertions(+), 240 deletions(-) rename src/test/java/org/apache/hadoop/hbase/replication/regionserver/{DISABLEDTestReplicationSink.java => TestReplicationSink.java} (97%) rename src/test/java/org/apache/hadoop/hbase/replication/regionserver/{DISABLEDTestReplicationSourceManager.java => TestReplicationSourceManager.java} (70%) diff --git a/CHANGES.txt b/CHANGES.txt index 28d42fdb06e..fb33f33ee11 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -580,7 +580,7 @@ Release 0.21.0 - Unreleased HBASE-3080 TestAdmin hanging on hudson HBASE-3063 TestThriftServer failing in TRUNK HBASE-3094 Fixes for miscellaneous broken tests - + HBASE-3060 [replication] Reenable replication on trunk with unit tests IMPROVEMENTS diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 6e21afd8f37..6b8b788f4cf 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; /** * This class serves as a helper for all things related to zookeeper @@ -81,31 +81,39 @@ public class ReplicationZookeeper { // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; // Map of addresses of peer clusters with their ZKW - private final Map peerClusters; + private Map peerClusters; // Path to the root replication znode - private final String replicationZNode; + private String replicationZNode; // Path to the peer clusters znode - private final String peersZNode; + private String peersZNode; // Path to the znode that contains all RS that replicates - private final String rsZNode; + private String rsZNode; // Path to this region server's name under rsZNode - private final String rsServerNameZnode; + private String rsServerNameZnode; // Name node if the replicationState znode - private final String replicationStateNodeName; + private String replicationStateNodeName; // If this RS is part of a master cluster - private final boolean replicationMaster; + private boolean replicationMaster; private final Configuration conf; // Is this cluster replicating at the moment? - private final AtomicBoolean replicating; + private AtomicBoolean replicating; // Byte (stored as string here) that identifies this cluster - private final String clusterId; + private String clusterId; // Abortable - private final Abortable abortable; + private Abortable abortable; + + public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk) + throws KeeperException { + + this.conf = conf; + this.zookeeper = zk; + setZNodes(); + } /** * Constructor used by region servers, connects to the peer cluster right away. * - * @param zookeeper + * @param server * @param replicating atomic boolean to start/stop replication * @throws IOException * @throws KeeperException @@ -115,6 +123,27 @@ public class ReplicationZookeeper { this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); + setZNodes(); + + this.peerClusters = new HashMap(); + this.replicating = replicating; + setReplicating(); + this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName()); + ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); + // Set a tracker on replicationStateNodeNode + ReplicationStatusTracker tracker = + new ReplicationStatusTracker(this.zookeeper, server); + tracker.start(); + + List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + if (znodes != null) { + for (String z : znodes) { + connectToPeer(z); + } + } + } + + private void setZNodes() throws KeeperException { String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = @@ -130,15 +159,11 @@ public class ReplicationZookeeper { String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + this.conf.get("hbase.zookeeper.property.clientPort") + ":" + this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); - - this.peerClusters = new HashMap(); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); - this.replicating = replicating; - setReplicating(); String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName); byte [] data = ZKUtil.getData(this.zookeeper, znode); String idResult = Bytes.toString(data); @@ -152,24 +177,6 @@ public class ReplicationZookeeper { LOG.info("This cluster (" + thisCluster + ") is a " + (this.replicationMaster ? "master" : "slave") + " for replication" + ", compared with (" + address + ")"); - - if (server.getServerName() != null) { - this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName()); - // Set a tracker on replicationStateNodeNode - ReplicationStatusTracker tracker = - new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server); - tracker.start(); - - List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - if (znodes != null) { - for (String z : znodes) { - connectToPeer(z); - } - } - } else { - this.rsServerNameZnode = null; - } - } /** @@ -178,47 +185,16 @@ public class ReplicationZookeeper { * @param peerClusterId (byte) the cluster to interrogate * @return addresses of all region servers */ - public List getPeersAddresses(String peerClusterId) { + public List getPeersAddresses(String peerClusterId) + throws KeeperException { if (this.peerClusters.size() == 0) { return new ArrayList(0); } - ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId); + ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId); + return zkw == null? new ArrayList(0): - zkw.scanAddressDirectory(this.zookeeper.rsZNode); - } - - /** - * Scan a directory of address data. - * @param znode The parent node - * @return The directory contents as HServerAddresses - */ - public List scanAddressDirectory(String znode) { - List list = new ArrayList(); - List nodes = null; - try { - nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Scanning " + znode, e); - } - if (nodes == null) { - return list; - } - for (String node : nodes) { - String path = ZKUtil.joinZNode(znode, node); - list.add(readAddress(path)); - } - return list; - } - - private HServerAddress readAddress(String znode) { - byte [] data = null; - try { - data = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Getting address", e); - } - return new HServerAddress(Bytes.toString(data)); + ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode); } /** @@ -239,15 +215,11 @@ public class ReplicationZookeeper { otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]); otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]); otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]); - // REENABLE -- FIX!!!! - /* - ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf, - "connection to cluster: " + peerId); - zkw.registerListener(new ReplicationStatusWatcher()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, + "connection to cluster: " + peerId, this.abortable); this.peerClusters.put(peerId, zkw); - this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode( + ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode( this.rsServerNameZnode, peerId)); - */ LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble)); } @@ -282,7 +254,7 @@ public class ReplicationZookeeper { try { String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes("")); + ZKUtil.createWithParents(this.zookeeper, znode); } catch (KeeperException e) { this.abortable.abort("Failed add log to list", e); } @@ -297,7 +269,7 @@ public class ReplicationZookeeper { try { String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.deleteChildrenRecursively(this.zookeeper, znode); + ZKUtil.deleteNode(this.zookeeper, znode); } catch (KeeperException e) { this.abortable.abort("Failed remove from list", e); } @@ -316,7 +288,7 @@ public class ReplicationZookeeper { String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); // Why serialize String of Long and note Long as bytes? - ZKUtil.createAndWatch(this.zookeeper, znode, + ZKUtil.setData(this.zookeeper, znode, Bytes.toBytes(Long.toString(position))); } catch (KeeperException e) { this.abortable.abort("Writing replication status", e); @@ -326,15 +298,18 @@ public class ReplicationZookeeper { /** * Get a list of all the other region servers in this cluster * and set a watch - * @param watch the watch to set * @return a list of server nanes */ - public List getRegisteredRegionServers(Watcher watch) { + public List getRegisteredRegionServers() { List result = null; try { - // TODO: This is rsZNode from zk which is like getListOfReplicators - // but maybe these are from different zk instances? - result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); + List nads = + ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode); + result = new ArrayList(nads.size()); + for (ZKUtil.NodeAndData nad : nads) { + String[] fullPath = nad.getNode().split("/"); + result.add(fullPath[fullPath.length - 1]); + } } catch (KeeperException e) { this.abortable.abort("Get list of registered region servers", e); } @@ -344,7 +319,6 @@ public class ReplicationZookeeper { /** * Get the list of the replicators that have queues, they can be alive, dead * or simply from a previous run - * @param watch the watche to set * @return a list of server names */ public List getListOfReplicators() { @@ -360,7 +334,6 @@ public class ReplicationZookeeper { /** * Get the list of peer clusters for the specified server names * @param rs server names of the rs - * @param watch the watch to set * @return a list of peer cluster */ public List getListPeersForRS(String rs) { @@ -378,7 +351,6 @@ public class ReplicationZookeeper { * Get the list of hlogs for the specified region server and peer cluster * @param rs server names of the rs * @param id peer cluster - * @param watch the watch to set * @return a list of hlogs */ public List getListHLogsForPeerForRS(String rs, String id) { @@ -401,10 +373,14 @@ public class ReplicationZookeeper { public boolean lockOtherRS(String znode) { try { String parent = ZKUtil.joinZNode(this.rsZNode, znode); + if (parent.equals(rsServerNameZnode)) { + LOG.warn("Won't lock because this is us, we're dead!"); + return false; + } String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode)); } catch (KeeperException e) { - this.abortable.abort("Failed lock other rs", e); + LOG.info("Failed lock other rs", e); } return true; } @@ -468,7 +444,7 @@ public class ReplicationZookeeper { */ public void deleteSource(String peerZnode) { try { - ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(rsServerNameZnode, peerZnode)); } catch (KeeperException e) { this.abortable.abort("Failed delete of " + peerZnode, e); @@ -481,7 +457,7 @@ public class ReplicationZookeeper { */ public void deleteRsQueues(String znode) { try { - ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(rsZNode, znode)); } catch (KeeperException e) { this.abortable.abort("Failed delete of " + znode, e); @@ -492,7 +468,12 @@ public class ReplicationZookeeper { * Delete this cluster's queues */ public void deleteOwnRSZNode() { - deleteRsQueues(this.rsServerNameZnode); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, + this.rsServerNameZnode); + } catch (KeeperException e) { + this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e); + } } /** @@ -510,13 +491,8 @@ public class ReplicationZookeeper { return data == null || data.length() == 0 ? 0 : Long.parseLong(data); } - /** - * Tells if this cluster replicates or not - * - * @return if this is a master - */ - public boolean isReplicationMaster() { - return this.replicationMaster; + public void registerRegionServerListener(ZooKeeperListener listener) { + this.zookeeper.registerListener(listener); } /** @@ -532,17 +508,29 @@ public class ReplicationZookeeper { * Get a map of all peer clusters * @return map of peer cluster, zk address to ZKW */ - public Map getPeerClusters() { + public Map getPeerClusters() { return this.peerClusters; } + public String getRSZNode() { + return rsZNode; + } + + /** + * + * @return + */ + public ZooKeeperWatcher getZookeeperWatcher() { + return this.zookeeper; + } + /** * Tracker for status of the replication */ public class ReplicationStatusTracker extends ZooKeeperNodeTracker { - public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node, + public ReplicationStatusTracker(ZooKeeperWatcher watcher, Abortable abortable) { - super(watcher, node, abortable); + super(watcher, getRepStateNode(), abortable); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index b4c881497e1..ef35320a706 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -23,24 +23,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; /** * Implementation of a log cleaner that checks if a log is still scheduled for * replication before deleting it when its TTL is over. */ -public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher { +public class ReplicationLogCleaner implements LogCleanerDelegate { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); @@ -78,31 +75,30 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher { private boolean refreshHLogsAndSearch(String searchedLog) { this.hlogs.clear(); final boolean lookForLog = searchedLog != null; -// REENALBE -// List rss = zkHelper.getListOfReplicators(this); -// if (rss == null) { -// LOG.debug("Didn't find any region server that replicates, deleting: " + -// searchedLog); -// return false; -// } -// for (String rs: rss) { -// List listOfPeers = zkHelper.getListPeersForRS(rs, this); -// // if rs just died, this will be null -// if (listOfPeers == null) { -// continue; -// } -// for (String id : listOfPeers) { -// List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this); -// if (peersHlogs != null) { -// this.hlogs.addAll(peersHlogs); -// } -// // early exit if we found the log -// if(lookForLog && this.hlogs.contains(searchedLog)) { -// LOG.debug("Found log in ZK, keeping: " + searchedLog); -// return true; -// } -// } -// } + List rss = zkHelper.getListOfReplicators(); + if (rss == null) { + LOG.debug("Didn't find any region server that replicates, deleting: " + + searchedLog); + return false; + } + for (String rs: rss) { + List listOfPeers = zkHelper.getListPeersForRS(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id); + if (peersHlogs != null) { + this.hlogs.addAll(peersHlogs); + } + // early exit if we found the log + if(lookForLog && this.hlogs.contains(searchedLog)) { + LOG.debug("Found log in ZK, keeping: " + searchedLog); + return true; + } + } + } LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog); return false; } @@ -110,15 +106,15 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher { @Override public void setConf(Configuration conf) { this.conf = conf; -// try { - // REENABLE -// this.zkHelper = new ReplicationZookeeperWrapper( -// ZooKeeperWrapper.createInstance(this.conf, -// HMaster.class.getName()), -// this.conf, new AtomicBoolean(true), null); -// } catch (IOException e) { -// LOG.error(e); -// } + try { + ZooKeeperWatcher zkw = + new ZooKeeperWatcher(conf, this.getClass().getName(), null); + this.zkHelper = new ReplicationZookeeper(conf, zkw); + } catch (KeeperException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } catch (IOException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } refreshHLogsAndSearch(null); } @@ -126,7 +122,4 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher { public Configuration getConf() { return conf; } - - @Override - public void process(WatchedEvent watchedEvent) {} } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b2c37da6aea..00beab5c818 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -45,7 +45,6 @@ import org.apache.zookeeper.KeeperException; public class Replication implements WALObserver { private final boolean replication; private final ReplicationSourceManager replicationManager; - private boolean replicationMaster; private final AtomicBoolean replicating = new AtomicBoolean(true); private final ReplicationZookeeper zkHelper; private final Configuration conf; @@ -70,10 +69,8 @@ public class Replication implements WALObserver { this.replication = isReplication(this.conf); if (replication) { this.zkHelper = new ReplicationZookeeper(server, this.replicating); - this.replicationMaster = zkHelper.isReplicationMaster(); - this.replicationManager = this.replicationMaster ? - new ReplicationSourceManager(zkHelper, conf, this.server, - fs, this.replicating, logDir, oldLogDir) : null; + this.replicationManager = new ReplicationSourceManager(zkHelper, conf, + this.server, fs, this.replicating, logDir, oldLogDir) ; } else { this.replicationManager = null; this.zkHelper = null; @@ -93,10 +90,8 @@ public class Replication implements WALObserver { */ public void join() { if (this.replication) { - if (this.replicationMaster) { - this.replicationManager.join(); - } - this.zkHelper.deleteOwnRSZNode(); + this.replicationManager.join(); + this.zkHelper.deleteOwnRSZNode(); } } @@ -106,7 +101,7 @@ public class Replication implements WALObserver { * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException { - if (this.replication && !this.replicationMaster) { + if (this.replication) { this.replicationSink.replicateEntries(entries); } } @@ -118,11 +113,8 @@ public class Replication implements WALObserver { */ public void startReplicationServices() throws IOException { if (this.replication) { - if (this.replicationMaster) { - this.replicationManager.init(); - } else { - this.replicationSink = new ReplicationSink(this.conf, this.server); - } + this.replicationManager.init(); + this.replicationSink = new ReplicationSink(this.conf, this.server); } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index cf6c71e6bc0..88baff12081 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -38,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; /** * This class is responsible for replicating the edits coming diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8f35d1b2487..8ebe22c2638 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.zookeeper.KeeperException; /** * Class that handles the source of a replication stream. @@ -195,7 +196,7 @@ public class ReplicationSource extends Thread /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() { + private void chooseSinks() throws KeeperException { this.currentPeers.clear(); List addresses = this.zkHelper.getPeersAddresses(peerClusterId); @@ -231,8 +232,14 @@ public class ReplicationSource extends Thread // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { -// this.position = this.zkHelper.getHLogRepPosition( -// this.peerClusterZnode, this.queue.peek().getName()); + try { + this.position = this.zkHelper.getHLogRepPosition( + this.peerClusterZnode, this.queue.peek().getName()); + } catch (KeeperException e) { + LOG.error("Couldn't get the position of this recovered queue " + + peerClusterZnode, e); + this.abort(); + } } int sleepMultiplier = 1; // Loop until we close down @@ -380,6 +387,8 @@ public class ReplicationSource extends Thread Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); + } catch (KeeperException e) { + LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -553,6 +562,8 @@ public class ReplicationSource extends Thread } while (!this.stopper.isStopped() && down); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); + } catch (KeeperException e) { + LOG.error("Error talking to zookeeper, retrying", e); } } @@ -589,7 +600,7 @@ public class ReplicationSource extends Thread } }; Threads.setDaemonThreadRunning( - this, n + ".replicationSource," + clusterId, handler); + this, n + ".replicationSource," + peerClusterZnode, handler); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 991cefadeb6..7553a70ab3a 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -36,8 +36,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * This class is responsible to manage all the replication @@ -104,8 +104,10 @@ public class ReplicationSourceManager { this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; + this.zkHelper.registerRegionServerListener( + new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); List otherRSs = - this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher()); + this.zkHelper.getRegisteredRegionServers(); this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; } @@ -145,7 +147,10 @@ public class ReplicationSourceManager { ReplicationSourceInterface src = addSource(id); src.startup(); } - List currentReplicators = this.zkHelper.getListOfReplicators(); + List currentReplicators = this.zkHelper.getRegisteredRegionServers(); + if (currentReplicators == null || currentReplicators.size() == 0) { + return; + } synchronized (otherRegionServers) { LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); @@ -322,29 +327,59 @@ public class ReplicationSourceManager { * in the local cluster. It initiates the process to transfer the queues * if it is able to grab the lock. */ - public class OtherRegionServerWatcher implements Watcher { - @Override - public void process(WatchedEvent watchedEvent) { - LOG.info(" event " + watchedEvent); - if (watchedEvent.getType().equals(Event.KeeperState.Expired) || - watchedEvent.getType().equals(Event.KeeperState.Disconnected)) { + public class OtherRegionServerWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + refreshRegionServersList(path); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + boolean cont = refreshRegionServersList(path); + if (!cont) { return; } + LOG.info(path + " znode expired, trying to lock it"); + String[] rsZnodeParts = path.split("/"); + transferQueues(rsZnodeParts[rsZnodeParts.length-1]); + } - List newRsList = (zkHelper.getRegisteredRegionServers(this)); + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + refreshRegionServersList(path); + } + + private boolean refreshRegionServersList(String path) { + if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) { + return false; + } + List newRsList = (zkHelper.getRegisteredRegionServers()); if (newRsList == null) { - return; + return false; } else { synchronized (otherRegionServers) { otherRegionServers.clear(); otherRegionServers.addAll(newRsList); } } - if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) { - LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it"); - String[] rsZnodeParts = watchedEvent.getPath().split("/"); - transferQueues(rsZnodeParts[rsZnodeParts.length-1]); - } + return true; } } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 0a9f79ff710..7495c88d816 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -916,7 +916,7 @@ public class ZKUtil { public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node) throws KeeperException { List children = ZKUtil.listChildrenNoWatch(zkw, node); - if(!children.isEmpty()) { + if(children != null || !children.isEmpty()) { for(String child : children) { deleteNodeRecursively(zkw, joinZNode(node, child)); } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 37fdddd29be..528301efcb4 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -54,10 +56,9 @@ public class TestReplication { private static Configuration conf1; private static Configuration conf2; -/* - private static ZooKeeperWrapper zkw1; - private static ZooKeeperWrapper zkw2; - */ + + private static ZooKeeperWatcher zkw1; + private static ZooKeeperWatcher zkw2; private static HTable htable1; private static HTable htable2; @@ -92,15 +93,15 @@ public class TestReplication { conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - /* REENALBE utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1"); - zkw1.writeZNode("/1", "replication", ""); - zkw1.writeZNode("/1/replication", "master", + zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null); + ZKUtil.createWithParents(zkw1, "/1/replication/master"); + ZKUtil.createWithParents(zkw1, "/1/replication/state"); + ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes( conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + conf1.get("hbase.zookeeper.property.clientPort")+":/1")); setIsReplication(true); LOG.info("Setup first Zk"); @@ -113,15 +114,13 @@ public class TestReplication { utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); - zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2"); - zkw2.writeZNode("/2", "replication", ""); - zkw2.writeZNode("/2/replication", "master", - conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null); + ZKUtil.createWithParents(zkw2, "/2/replication"); - zkw1.writeZNode("/1/replication/peers", "1", + ZKUtil.createWithParents(zkw1, "/1/replication/peers/2"); + ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes( conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf2.get("hbase.zookeeper.property.clientPort")+":/2"); + conf2.get("hbase.zookeeper.property.clientPort")+":/2")); LOG.info("Setup second Zk"); @@ -143,12 +142,12 @@ public class TestReplication { htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); htable2 = new HTable(conf2, tableName); - */ } private static void setIsReplication(boolean rep) throws Exception { LOG.info("Set rep " + rep); - // REENALBE zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep)); + ZKUtil.setData(zkw1,"/1/replication/state", + Bytes.toBytes(Boolean.toString(rep))); // Takes some ms for ZK to fire the watcher Thread.sleep(SLEEP_TIME); } @@ -181,7 +180,7 @@ public class TestReplication { * Add a row, check it's replicated, delete it, check's gone * @throws Exception */ - @Ignore @Test + @Test public void testSimplePutDelete() throws Exception { LOG.info("testSimplePutDelete"); Put put = new Put(row); @@ -229,7 +228,7 @@ public class TestReplication { * Try a small batch upload using the write buffer, check it's replicated * @throws Exception */ - @Ignore @Test + @Test public void testSmallBatch() throws Exception { LOG.info("testSmallBatch"); Put put; @@ -273,7 +272,7 @@ public class TestReplication { * replicated, enable it, try replicating and it should work * @throws Exception */ - @Ignore @Test + @Test public void testStartStop() throws Exception { // Test stopping replication @@ -342,7 +341,7 @@ public class TestReplication { * hlog rolling and other non-trivial code paths * @throws Exception */ - @Ignore @Test + @Test public void loadTesting() throws Exception { htable1.setWriteBufferSize(1024); htable1.setAutoFlush(false); @@ -396,7 +395,7 @@ public class TestReplication { * the upload. The failover happens internally. * @throws Exception */ - @Ignore @Test + @Test public void queueFailover() throws Exception { utility1.createMultiRegions(htable1, famName); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index aeb0ff90f2b..f019c93e75a 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -69,7 +69,7 @@ public class TestReplicationSource { * time reading logs that are being archived. * @throws Exception */ - @Ignore @Test + @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); HLog.Writer writer = HLog.createWriter(fs, logPath, conf); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java similarity index 97% rename from src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java rename to src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index dae62a95f30..1761eff4b7d 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -47,10 +47,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class DISABLEDTestReplicationSink { +public class TestReplicationSink { private static final Log LOG = - LogFactory.getLog(DISABLEDTestReplicationSink.class); + LogFactory.getLog(TestReplicationSink.class); private static final int BATCH_SIZE = 10; @@ -128,7 +128,7 @@ public class DISABLEDTestReplicationSink { * Insert a whole batch of entries * @throws Exception */ - @Ignore @Test + @Test public void testBatchSink() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { @@ -144,7 +144,7 @@ public class DISABLEDTestReplicationSink { * Insert a mix of puts and deletes * @throws Exception */ - @Ignore @Test + @Test public void testMixedPutDelete() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2]; for(int i = 0; i < BATCH_SIZE/2; i++) { @@ -168,7 +168,7 @@ public class DISABLEDTestReplicationSink { * Insert to 2 different tables * @throws Exception */ - @Ignore @Test + @Test public void testMixedPutTables() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { @@ -189,7 +189,7 @@ public class DISABLEDTestReplicationSink { * Insert then do different types of deletes * @throws Exception */ - @Ignore @Test + @Test public void testMixedDeletes() throws Exception { HLog.Entry[] entries = new HLog.Entry[3]; for(int i = 0; i < 3; i++) { @@ -214,7 +214,7 @@ public class DISABLEDTestReplicationSink { * before the actual Put that creates it. * @throws Exception */ - @Ignore @Test + @Test public void testApplyDeleteBeforePut() throws Exception { HLog.Entry[] entries = new HLog.Entry[5]; for(int i = 0; i < 2; i++) { diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java similarity index 70% rename from src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java rename to src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9936634f18c..dd266e15dc4 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -31,15 +31,16 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALObserver; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; -// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -54,10 +55,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; -public class DISABLEDTestReplicationSourceManager { +public class TestReplicationSourceManager { private static final Log LOG = - LogFactory.getLog(DISABLEDTestReplicationSourceManager.class); + LogFactory.getLog(TestReplicationSourceManager.class); private static Configuration conf; @@ -65,9 +66,11 @@ public class DISABLEDTestReplicationSourceManager { private static final AtomicBoolean REPLICATING = new AtomicBoolean(false); + private static Replication replication; + private static ReplicationSourceManager manager; - // REENALBE private static ZooKeeperWrapper zkw; + private static ZooKeeperWatcher zkw; private static HTableDescriptor htd; @@ -100,26 +103,25 @@ public class DISABLEDTestReplicationSourceManager { utility = new HBaseTestingUtility(conf); utility.startMiniZKCluster(); - // REENABLE -// zkw = ZooKeeperWrapper.createInstance(conf, "test"); -// zkw.writeZNode("/hbase", "replication", ""); -// zkw.writeZNode("/hbase/replication", "master", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); -// zkw.writeZNode("/hbase/replication/peers", "1", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); + zkw = new ZooKeeperWatcher(conf, "test", null); + ZKUtil.createWithParents(zkw, "/hbase/replication"); + ZKUtil.createWithParents(zkw, "/hbase/replication/master"); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); + ZKUtil.setData(zkw, "/hbase/replication/master", + Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf.get("hbase.zookeeper.property.clientPort")+":/1")); + ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes( + conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf.get("hbase.zookeeper.property.clientPort")+":/1")); - HRegionServer server = new HRegionServer(conf); - ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(helper, - conf, server, fs, REPLICATING, logDir, oldLogDir); manager.addSource("1"); htd = new HTableDescriptor(test); @@ -137,7 +139,7 @@ public class DISABLEDTestReplicationSourceManager { @AfterClass public static void tearDownAfterClass() throws Exception { -// REENABLE manager.join(); + manager.join(); utility.shutdownMiniCluster(); } @@ -152,7 +154,7 @@ public class DISABLEDTestReplicationSourceManager { setUp(); } - @Ignore @Test + @Test public void testLogRoll() throws Exception { long seq = 0; long baseline = 1000; @@ -160,8 +162,9 @@ public class DISABLEDTestReplicationSourceManager { KeyValue kv = new KeyValue(r1, f1, r1); WALEdit edit = new WALEdit(); edit.add(kv); + List listeners = new ArrayList(); -// REENABLE listeners.add(manager); + listeners.add(replication); HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8")); @@ -195,17 +198,55 @@ public class DISABLEDTestReplicationSourceManager { hlog.rollWriter(); - // REENABLE manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - // REENABLE "1", 0, false); + manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), + "1", 0, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis()); hlog.append(hri, key, edit); - // REENABLE assertEquals(1, manager.getHLogs().size()); + assertEquals(1, manager.getHLogs().size()); // TODO Need a case with only 2 HLogs and we only want to delete the first one } + static class DummyServer implements Server { + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public String getServerName() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void abort(String why, Throwable e) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void stop(String why) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean isStopped() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + } + }