HBASE-2611 Handle RS that fails while processing the failure of another one (Himanshu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1439744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1d330c9478
commit
657c26acb8
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||
|
@ -790,6 +791,58 @@ public class ReplicationZookeeper implements Closeable {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* It "atomically" copies all the hlogs queues from another region server and returns them all
|
||||
* sorted per peer cluster (appended with the dead server's znode).
|
||||
* @param znode
|
||||
* @return HLog queues sorted per peer cluster
|
||||
*/
|
||||
public SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
|
||||
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
|
||||
String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs
|
||||
List<String> peerIdsToProcess = null;
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
||||
try {
|
||||
peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
|
||||
if (peerIdsToProcess == null) return null; // node already processed
|
||||
for (String peerId : peerIdsToProcess) {
|
||||
String newPeerId = peerId + "-" + znode;
|
||||
String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId);
|
||||
// check the logs queue for the old peer cluster
|
||||
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
||||
List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||
if (hlogs == null || hlogs.size() == 0) continue; // empty log queue.
|
||||
// create the new cluster znode
|
||||
SortedSet<String> logQueue = new TreeSet<String>();
|
||||
queues.put(newPeerId, logQueue);
|
||||
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
|
||||
listOfOps.add(op);
|
||||
// get the offset of the logs and set it to new znodes
|
||||
for (String hlog : hlogs) {
|
||||
String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog);
|
||||
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode);
|
||||
LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset));
|
||||
String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog);
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
|
||||
// add ops for deleting
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode));
|
||||
logQueue.add(hlog);
|
||||
}
|
||||
// add delete op for peer
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
}
|
||||
// add delete op for dead rs
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
|
||||
LOG.debug(" The multi list size is: " + listOfOps.size());
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
LOG.info("Atomically moved the dead regionserver logs. ");
|
||||
} catch (KeeperException e) {
|
||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||
}
|
||||
return queues;
|
||||
}
|
||||
|
||||
/**
|
||||
* This methods copies all the hlogs queues from another region server
|
||||
* and returns them all sorted per peer cluster (appended with the dead
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
|
@ -581,14 +582,22 @@ public class ReplicationSourceManager {
|
|||
LOG.info("Not transferring queue since we are shutting down");
|
||||
return;
|
||||
}
|
||||
if (!zkHelper.lockOtherRS(rsZnode)) {
|
||||
return;
|
||||
SortedMap<String, SortedSet<String>> newQueues = null;
|
||||
|
||||
// check whether there is multi support. If yes, use it.
|
||||
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
|
||||
LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue");
|
||||
newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode);
|
||||
} else {
|
||||
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
|
||||
if (!zkHelper.lockOtherRS(rsZnode)) {
|
||||
return;
|
||||
}
|
||||
newQueues = zkHelper.copyQueuesFromRS(rsZnode);
|
||||
zkHelper.deleteRsQueues(rsZnode);
|
||||
}
|
||||
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
|
||||
SortedMap<String, SortedSet<String>> newQueues =
|
||||
zkHelper.copyQueuesFromRS(rsZnode);
|
||||
zkHelper.deleteRsQueues(rsZnode);
|
||||
if (newQueues == null || newQueues.size() == 0) {
|
||||
// process of copying over the failed queue is completed.
|
||||
if (newQueues.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue