HBASE-19711 TestReplicationAdmin.testConcurrentPeerOperations hangs

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-05 15:39:06 +08:00 committed by zhangduo
parent ec364d0d6c
commit 2d5267331e
1 changed files with 19 additions and 4 deletions

View File

@ -402,7 +402,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
boolean tableDeleted;
if (proc.hasException()) {
Exception procEx = proc.getException().unwrapRemoteException();
@ -423,9 +423,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
} else if (proc instanceof PeerProcedureInterface) {
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) {
removePeerQueue(iProcPeer.getPeerId());
}
tryCleanupPeerQueue(iProcPeer.getPeerId(), proc);
} else {
// No cleanup for ServerProcedureInterface types, yet.
return;
@ -514,6 +512,23 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removePeerLock(peerId);
}
private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
schedLock();
try {
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}
final LockAndQueue lock = locking.getPeerLock(peerId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, queue);
removePeerQueue(peerId);
}
} finally {
schedUnlock();
}
}
private static boolean isPeerProcedure(Procedure<?> proc) {
return proc instanceof PeerProcedureInterface;