HBASE-21376 Add some verbose log to MasterProcedureScheduler
This commit is contained in:
parent
ee19f2b36f
commit
f1d9f59bfe
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -156,9 +157,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
// which means it can be executed immediately.
|
||||
// 2. The exclusive lock for this queue has not been held.
|
||||
// 3. The given procedure has the exclusive lock permission for this queue.
|
||||
if (proc.hasLock() || proc.isLockedWhenLoading() || !queue.getLockStatus().hasExclusiveLock() ||
|
||||
queue.getLockStatus().hasLockAccess(proc)) {
|
||||
addToRunQueue(fairq, queue);
|
||||
Supplier<String> reason = null;
|
||||
if (proc.hasLock()) {
|
||||
reason = () -> proc + " has lock";
|
||||
} else if (proc.isLockedWhenLoading()) {
|
||||
reason = () -> proc + " restores lock when restarting";
|
||||
} else if (!queue.getLockStatus().hasExclusiveLock()) {
|
||||
reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
|
||||
} else if (queue.getLockStatus().hasLockAccess(proc)) {
|
||||
reason = () -> proc + " has the excusive lock access";
|
||||
}
|
||||
if (reason != null) {
|
||||
addToRunQueue(fairq, queue, reason);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,7 +221,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
if (isLockReady(proc, rq)) {
|
||||
// the queue is empty, remove from run queue
|
||||
if (rq.isEmpty()) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
|
||||
}
|
||||
return proc;
|
||||
}
|
||||
|
@ -219,7 +229,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
rq.add(proc, false);
|
||||
}
|
||||
// no procedure is ready for execution, remove from run queue
|
||||
removeFromRunQueue(fairq, rq);
|
||||
removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -273,11 +283,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
|
||||
private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
|
||||
final FairQueue<T> fairq, final AvlKeyComparator<TNode> comparator) {
|
||||
FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
|
||||
while (treeMap != null) {
|
||||
Queue<T> node = AvlTree.getFirst(treeMap);
|
||||
treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
|
||||
if (fairq != null) removeFromRunQueue(fairq, node);
|
||||
if (fairq != null) {
|
||||
removeFromRunQueue(fairq, node, () -> "clear all queues");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,14 +346,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
|
||||
private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
|
||||
Supplier<String> reason) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Add {} to run queue because: {}", queue, reason.get());
|
||||
}
|
||||
if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
|
||||
fairq.add(queue);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends Comparable<T>> void removeFromRunQueue(
|
||||
FairQueue<T> fairq, Queue<T> queue) {
|
||||
private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
|
||||
Queue<T> queue, Supplier<String> reason) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove {} from run queue because: {}", queue, reason.get());
|
||||
}
|
||||
if (AvlIterableList.isLinked(queue)) {
|
||||
fairq.remove(queue);
|
||||
}
|
||||
|
@ -411,7 +430,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
LockAndQueue lock = locking.getServerLock(serverName);
|
||||
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
|
||||
removeFromRunQueue(serverRunQueue, node);
|
||||
removeFromRunQueue(serverRunQueue, node,
|
||||
() -> "clean up server queue after " + proc + " completed");
|
||||
removeServerQueue(serverName);
|
||||
}
|
||||
} finally {
|
||||
|
@ -459,7 +479,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(peerRunQueue, queue);
|
||||
removeFromRunQueue(peerRunQueue, queue,
|
||||
() -> "clean up peer queue after " + procedure + " completed");
|
||||
removePeerQueue(peerId);
|
||||
}
|
||||
} finally {
|
||||
|
@ -539,7 +560,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
|
||||
return true;
|
||||
}
|
||||
removeFromRunQueue(tableRunQueue, getTableQueue(table));
|
||||
removeFromRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " held the exclusive lock");
|
||||
return false;
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -563,7 +585,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
if (namespaceLock.releaseSharedLock()) {
|
||||
waitingCount += wakeWaitingProcedures(namespaceLock);
|
||||
}
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " released the exclusive lock");
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -615,7 +638,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
final LockAndQueue tableLock = locking.getTableLock(table);
|
||||
int waitingCount = 0;
|
||||
if (tableLock.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " released the shared lock");
|
||||
waitingCount += wakeWaitingProcedures(tableLock);
|
||||
}
|
||||
if (namespaceLock.releaseSharedLock()) {
|
||||
|
@ -823,7 +847,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
waitingCount += wakeWaitingProcedures(namespaceLock);
|
||||
}
|
||||
if (systemNamespaceTableLock.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
|
||||
() -> procedure + " released namespace exclusive lock");
|
||||
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
|
||||
}
|
||||
wakePollIfNeeded(waitingCount);
|
||||
|
@ -853,7 +878,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
removeFromRunQueue(serverRunQueue,
|
||||
getServerQueue(serverName,
|
||||
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
|
||||
: null));
|
||||
: null),
|
||||
() -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -881,7 +907,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
addToRunQueue(serverRunQueue,
|
||||
getServerQueue(serverName,
|
||||
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
|
||||
: null));
|
||||
: null), () -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
|
@ -908,7 +934,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId));
|
||||
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
|
||||
() -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -930,7 +957,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (lock.releaseExclusiveLock(procedure)) {
|
||||
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
|
||||
addToRunQueue(peerRunQueue, getPeerQueue(peerId),
|
||||
() -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
}
|
||||
|
@ -956,7 +984,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(metaRunQueue, getMetaQueue());
|
||||
removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -980,7 +1008,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
lock.releaseExclusiveLock(procedure);
|
||||
addToRunQueue(metaRunQueue, getMetaQueue());
|
||||
addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue