diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index cd888670f12..440fa4237db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -790,6 +791,58 @@ public class ReplicationZookeeper implements Closeable { return true; } + /** + * It "atomically" copies all the hlogs queues from another region server and returns them all + * sorted per peer cluster (appended with the dead server's znode). + * @param znode + * @return HLog queues sorted per peer cluster + */ + public SortedMap> copyQueuesFromRSUsingMulti(String znode) { + SortedMap> queues = new TreeMap>(); + String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs + List peerIdsToProcess = null; + List listOfOps = new ArrayList(); + try { + peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); + if (peerIdsToProcess == null) return null; // node already processed + for (String peerId : peerIdsToProcess) { + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + if (hlogs == null || hlogs.size() == 0) continue; // empty log queue. + // create the new cluster znode + SortedSet logQueue = new TreeSet(); + queues.put(newPeerId, logQueue); + ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); + listOfOps.add(op); + // get the offset of the logs and set it to new znodes + for (String hlog : hlogs) { + String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); + LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + // add ops for deleting + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); + logQueue.add(hlog); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } + // add delete op for dead rs + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); + LOG.debug(" The multi list size is: " + listOfOps.size()); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + LOG.info("Atomically moved the dead regionserver logs. "); + } catch (KeeperException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + } + return queues; + } + /** * This methods copies all the hlogs queues from another region server * and returns them all sorted per peer cluster (appended with the dead diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5ef544d7a28..42564caec42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -581,14 +582,22 @@ public class ReplicationSourceManager { LOG.info("Not transferring queue since we are shutting down"); return; } - if (!zkHelper.lockOtherRS(rsZnode)) { - return; + SortedMap> newQueues = null; + + // check whether there is multi support. If yes, use it. + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { + LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue"); + newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode); + } else { + LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); + if (!zkHelper.lockOtherRS(rsZnode)) { + return; + } + newQueues = zkHelper.copyQueuesFromRS(rsZnode); + zkHelper.deleteRsQueues(rsZnode); } - LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - SortedMap> newQueues = - zkHelper.copyQueuesFromRS(rsZnode); - zkHelper.deleteRsQueues(rsZnode); - if (newQueues == null || newQueues.size() == 0) { + // process of copying over the failed queue is completed. + if (newQueues.size() == 0) { return; }