HBASE-17077 Don't copy the replication queue belonging to the peer which has been deleted (Guanghao Zhang)
This commit is contained in:
parent
469462c850
commit
b77bfe9d3b
|
@ -370,9 +370,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* It "atomically" copies all the wals queues from another region server and returns them all
|
* It "atomically" copies one peer's wals queue from another dead region server and returns them
|
||||||
* sorted per peer cluster (appended with the dead server's znode).
|
* all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
|
||||||
* @param znode pertaining to the region server to copy the queues from
|
* @param znode pertaining to the region server to copy the queues from
|
||||||
|
* @peerId peerId pertaining to the queue need to be copied
|
||||||
*/
|
*/
|
||||||
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
|
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
|
||||||
try {
|
try {
|
||||||
|
@ -380,18 +381,25 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
|
||||||
// the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
|
|
||||||
// this will cause the whole multi op fail.
|
|
||||||
// NodeFailoverWorker will skip the orphaned queues.
|
|
||||||
LOG.warn("Peer " + peerId +
|
|
||||||
" didn't exist, will move its queue to avoid the failure of multi op");
|
|
||||||
}
|
|
||||||
String newPeerId = peerId + "-" + znode;
|
String newPeerId = peerId + "-" + znode;
|
||||||
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
|
||||||
// check the logs queue for the old peer cluster
|
// check the logs queue for the old peer cluster
|
||||||
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
|
||||||
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||||
|
|
||||||
|
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||||
|
LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
|
||||||
|
" didn't exist, will move its queue to avoid the failure of multi op");
|
||||||
|
for (String wal : wals) {
|
||||||
|
String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
|
||||||
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
||||||
|
}
|
||||||
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||||
|
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
SortedSet<String> logQueue = new TreeSet<>();
|
SortedSet<String> logQueue = new TreeSet<>();
|
||||||
if (wals == null || wals.size() == 0) {
|
if (wals == null || wals.size() == 0) {
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||||
|
@ -416,8 +424,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
||||||
LOG.trace(" The multi list size is: " + listOfOps.size());
|
LOG.trace(" The multi list size is: " + listOfOps.size());
|
||||||
}
|
}
|
||||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||||
if (LOG.isTraceEnabled())
|
|
||||||
LOG.trace("Atomically moved the dead regionserver logs. ");
|
LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
|
||||||
return new Pair<>(newPeerId, logQueue);
|
return new Pair<>(newPeerId, logQueue);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||||
|
|
|
@ -629,7 +629,9 @@ public class TestReplicationSourceManager {
|
||||||
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
|
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
|
||||||
for(String queue:queues){
|
for(String queue:queues){
|
||||||
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
|
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
|
||||||
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
if (pair != null) {
|
||||||
|
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
server.abort("Done with testing", null);
|
server.abort("Done with testing", null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
Loading…
Reference in New Issue