HBASE-17077 Don't copy the replication queue belonging to the peer which has been deleted (Guanghao Zhang)
This commit is contained in:
parent
3f919dd6c3
commit
dba7ec1b69
|
@ -251,9 +251,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
}
|
||||
|
||||
/**
|
||||
* It "atomically" copies all the wals queues from another region server and returns them all
|
||||
* sorted per peer cluster (appended with the dead server's znode).
|
||||
* It "atomically" copies one peer's wals queue from another dead region server and returns them
|
||||
* all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
|
||||
* @param znode pertaining to the region server to copy the queues from
|
||||
* @peerId peerId pertaining to the queue need to be copied
|
||||
*/
|
||||
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
|
||||
try {
|
||||
|
@ -261,18 +262,25 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||
// the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
|
||||
// this will cause the whole multi op fail.
|
||||
// NodeFailoverWorker will skip the orphaned queues.
|
||||
LOG.warn("Peer " + peerId +
|
||||
" didn't exist, will move its queue to avoid the failure of multi op");
|
||||
}
|
||||
|
||||
String newPeerId = peerId + "-" + znode;
|
||||
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
||||
// check the logs queue for the old peer cluster
|
||||
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
||||
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||
|
||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||
LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
|
||||
" didn't exist, will move its queue to avoid the failure of multi op");
|
||||
for (String wal : wals) {
|
||||
String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
||||
}
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
SortedSet<String> logQueue = new TreeSet<>();
|
||||
if (wals == null || wals.size() == 0) {
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
|
@ -297,8 +305,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
LOG.trace(" The multi list size is: " + listOfOps.size());
|
||||
}
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace("Atomically moved the dead regionserver logs. ");
|
||||
|
||||
LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
|
||||
return new Pair<>(newPeerId, logQueue);
|
||||
} catch (KeeperException e) {
|
||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||
|
|
|
@ -543,7 +543,9 @@ public abstract class TestReplicationSourceManager {
|
|||
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
|
||||
for(String queue:queues){
|
||||
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
|
||||
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
||||
if (pair != null) {
|
||||
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
||||
}
|
||||
}
|
||||
server.abort("Done with testing", null);
|
||||
} catch (Exception e) {
|
||||
|
|
Loading…
Reference in New Issue