HBASE-21376 Add some verbose log to MasterProcedureScheduler
This commit is contained in:
parent
8135285506
commit
6786aca666
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -155,9 +156,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
// which means it can be executed immediately.
|
||||
// 2. The exclusive lock for this queue has not been held.
|
||||
// 3. The given procedure has the exclusive lock permission for this queue.
|
||||
if (proc.hasLock() || proc.isLockedWhenLoading() || !queue.getLockStatus().hasExclusiveLock() ||
|
||||
queue.getLockStatus().hasLockAccess(proc)) {
|
||||
addToRunQueue(fairq, queue);
|
||||
Supplier<String> reason = null;
|
||||
if (proc.hasLock()) {
|
||||
reason = () -> proc + " has lock";
|
||||
} else if (proc.isLockedWhenLoading()) {
|
||||
reason = () -> proc + " restores lock when restarting";
|
||||
} else if (!queue.getLockStatus().hasExclusiveLock()) {
|
||||
reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
|
||||
} else if (queue.getLockStatus().hasLockAccess(proc)) {
|
||||
reason = () -> proc + " has the excusive lock access";
|
||||
}
|
||||
if (reason != null) {
|
||||
addToRunQueue(fairq, queue, reason);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,7 +220,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
if (isLockReady(proc, rq)) {
|
||||
// the queue is empty, remove from run queue
|
||||
if (rq.isEmpty()) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
|
||||
}
|
||||
return proc;
|
||||
}
|
||||
|
@ -218,7 +228,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
rq.add(proc, false);
|
||||
}
|
||||
// no procedure is ready for execution, remove from run queue
|
||||
removeFromRunQueue(fairq, rq);
|
||||
removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -272,11 +282,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
|
||||
private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
|
||||
final FairQueue<T> fairq, final AvlKeyComparator<TNode> comparator) {
|
||||
FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
|
||||
while (treeMap != null) {
|
||||
Queue<T> node = AvlTree.getFirst(treeMap);
|
||||
treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
|
||||
if (fairq != null) removeFromRunQueue(fairq, node);
|
||||
if (fairq != null) {
|
||||
removeFromRunQueue(fairq, node, () -> "clear all queues");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,14 +345,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
|
||||
private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
|
||||
Supplier<String> reason) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Add {} to run queue because: {}", queue, reason.get());
|
||||
}
|
||||
if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
|
||||
fairq.add(queue);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends Comparable<T>> void removeFromRunQueue(
|
||||
FairQueue<T> fairq, Queue<T> queue) {
|
||||
private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
|
||||
Queue<T> queue, Supplier<String> reason) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Remove {} from run queue because: {}", queue, reason.get());
|
||||
}
|
||||
if (AvlIterableList.isLinked(queue)) {
|
||||
fairq.remove(queue);
|
||||
}
|
||||
|
@ -410,7 +429,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
LockAndQueue lock = locking.getServerLock(serverName);
|
||||
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
|
||||
removeFromRunQueue(serverRunQueue, node);
|
||||
removeFromRunQueue(serverRunQueue, node,
|
||||
() -> "clean up server queue after " + proc + " completed");
|
||||
removeServerQueue(serverName);
|
||||
}
|
||||
} finally {
|
||||
|
@ -458,7 +478,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(peerRunQueue, queue);
|
||||
removeFromRunQueue(peerRunQueue, queue,
|
||||
() -> "clean up peer queue after " + procedure + " completed");
|
||||
removePeerQueue(peerId);
|
||||
}
|
||||
} finally {
|
||||
|
@ -538,7 +559,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
|
||||
return true;
|
||||
}
|
||||
removeFromRunQueue(tableRunQueue, getTableQueue(table));
|
||||
removeFromRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " held the exclusive lock");
|
||||
return false;
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -562,7 +584,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
if (namespaceLock.releaseSharedLock()) {
|
||||
waitingCount += wakeWaitingProcedures(namespaceLock);
|
||||
}
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " released the exclusive lock");
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -614,7 +637,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
final LockAndQueue tableLock = locking.getTableLock(table);
|
||||
int waitingCount = 0;
|
||||
if (tableLock.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(table),
|
||||
() -> procedure + " released the shared lock");
|
||||
waitingCount += wakeWaitingProcedures(tableLock);
|
||||
}
|
||||
if (namespaceLock.releaseSharedLock()) {
|
||||
|
@ -822,7 +846,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
waitingCount += wakeWaitingProcedures(namespaceLock);
|
||||
}
|
||||
if (systemNamespaceTableLock.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME));
|
||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
|
||||
() -> procedure + " released namespace exclusive lock");
|
||||
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
|
||||
}
|
||||
wakePollIfNeeded(waitingCount);
|
||||
|
@ -852,7 +877,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
removeFromRunQueue(serverRunQueue,
|
||||
getServerQueue(serverName,
|
||||
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
|
||||
: null));
|
||||
: null),
|
||||
() -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -880,7 +906,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
addToRunQueue(serverRunQueue,
|
||||
getServerQueue(serverName,
|
||||
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
|
||||
: null));
|
||||
: null), () -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
|
@ -903,7 +929,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId));
|
||||
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
|
||||
() -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -925,7 +952,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||
if (lock.releaseExclusiveLock(procedure)) {
|
||||
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
|
||||
addToRunQueue(peerRunQueue, getPeerQueue(peerId),
|
||||
() -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
}
|
||||
|
@ -951,7 +979,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(metaRunQueue, getMetaQueue());
|
||||
removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -975,7 +1003,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getMetaLock();
|
||||
lock.releaseExclusiveLock(procedure);
|
||||
addToRunQueue(metaRunQueue, getMetaQueue());
|
||||
addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue