HBASE-19711 TestReplicationAdmin.testConcurrentPeerOperations hangs

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2018-01-05 15:39:06 +08:00 committed by zhangduo
parent 0165455d34
commit 1e69d7fa24
1 changed files with 19 additions and 4 deletions

View File

@ -290,7 +290,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
boolean tableDeleted;
if (proc.hasException()) {
Exception procEx = proc.getException().unwrapRemoteException();
@ -311,9 +311,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
} else if (proc instanceof PeerProcedureInterface) {
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) {
removePeerQueue(iProcPeer.getPeerId());
}
tryCleanupPeerQueue(iProcPeer.getPeerId(), proc);
} else {
// No cleanup for ServerProcedureInterface types, yet.
return;
@ -402,6 +400,23 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removePeerLock(peerId);
}
private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
schedLock();
try {
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}
final LockAndQueue lock = locking.getPeerLock(peerId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, queue);
removePeerQueue(peerId);
}
} finally {
schedUnlock();
}
}
private static boolean isPeerProcedure(Procedure<?> proc) {
return proc instanceof PeerProcedureInterface;