HBASE-20739 Add priority for SCP

This commit is contained in:
zhangduo 2018-06-19 16:29:01 +08:00
parent 8aabe36a49
commit 5c2cb15e0b
5 changed files with 66 additions and 19 deletions

View File

@ -38,8 +38,10 @@ public class FairQueue<T extends Comparable<T>> {
return; return;
} }
// Find the one which priority is less than us // Find the one which priority is less than us
// For now only TableQueue has priority, and there are only a small number of tables which // For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small
// have higher priority so this will not be an expensive operation. // number of tables which have higher priority, and for ServerQueue there is only one server
// which could carry meta which leads to a higher priority, so this will not be an expensive
// operation.
Queue<T> base = queueHead; Queue<T> base = queueHead;
do { do {
if (base.getPriority() < queue.getPriority()) { if (base.getPriority() < queue.getPriority()) {

View File

@ -128,7 +128,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
} else if (isTableProcedure(proc)) { } else if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) { } else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); ServerProcedureInterface spi = (ServerProcedureInterface) proc;
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if (isPeerProcedure(proc)) { } else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else { } else {
@ -317,10 +318,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return; return;
} }
} else if (proc instanceof PeerProcedureInterface) { } else if (proc instanceof PeerProcedureInterface) {
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc; tryCleanupPeerQueue(getPeerId(proc), proc);
tryCleanupPeerQueue(iProcPeer.getPeerId(), proc); } else if (proc instanceof ServerProcedureInterface) {
tryCleanupServerQueue(getServerName(proc), proc);
} else { } else {
// No cleanup for ServerProcedureInterface types, yet. // No cleanup for other procedure types, yet.
return; return;
} }
} }
@ -367,16 +369,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ============================================================================ // ============================================================================
// Server Queue Lookup Helpers // Server Queue Lookup Helpers
// ============================================================================ // ============================================================================
private ServerQueue getServerQueue(ServerName serverName) { private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
final int index = getBucketIndex(serverBuckets, serverName.hashCode()); final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node != null) return node; if (node != null) {
return node;
node = new ServerQueue(serverName, locking.getServerLock(serverName)); }
int priority;
if (proc != null) {
priority = MasterProcedureUtil.getServerPriority(proc);
} else {
LOG.warn("Usually this should not happen as proc can only be null when calling from " +
"wait/wake lock, which means at least we should have one procedure in the queue which " +
"wants to acquire the lock or just released the lock.");
priority = 1;
}
node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
return node; return node;
} }
private void removeServerQueue(ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
serverBuckets[index] =
AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
locking.removeServerLock(serverName);
}
private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
schedLock();
try {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node == null) {
return;
}
LockAndQueue lock = locking.getServerLock(serverName);
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
removeFromRunQueue(serverRunQueue, node);
removeServerQueue(serverName);
}
} finally {
schedUnlock();
}
}
private static int getBucketIndex(Object[] buckets, int hashCode) { private static int getBucketIndex(Object[] buckets, int hashCode) {
return Math.abs(hashCode) % buckets.length; return Math.abs(hashCode) % buckets.length;
} }
@ -810,7 +848,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try { try {
final LockAndQueue lock = locking.getServerLock(serverName); final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) { if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(serverRunQueue, getServerQueue(serverName)); // We do not need to create a new queue so just pass null, as in tests we may pass
// procedures other than ServerProcedureInterface
removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
return false; return false;
} }
waitProcedure(lock, procedure); waitProcedure(lock, procedure);
@ -832,7 +872,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try { try {
final LockAndQueue lock = locking.getServerLock(serverName); final LockAndQueue lock = locking.getServerLock(serverName);
lock.releaseExclusiveLock(procedure); lock.releaseExclusiveLock(procedure);
addToRunQueue(serverRunQueue, getServerQueue(serverName)); // We do not need to create a new queue so just pass null, as in tests we may pass procedures
// other than ServerProcedureInterface
addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
int waitingCount = wakeWaitingProcedures(lock); int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount); wakePollIfNeeded(waitingCount);
} finally { } finally {

View File

@ -170,11 +170,10 @@ public final class MasterProcedureUtil {
} }
/** /**
* Return the total levels of table priority. Now we have 3 levels, for meta table, other system * Return the priority for the given procedure. For now we only have two priorities, 100 for
* tables and user tables. Notice that the actual value of priority should be decreased from this * server carrying meta, and 1 for others.
* value down to 1.
*/ */
public static int getTablePriorityLevels() { public static int getServerPriority(ServerProcedureInterface proc) {
return 3; return proc.hasMetaTableRegion() ? 100 : 1;
} }
} }

View File

@ -95,6 +95,10 @@ class SchemaLocking {
return getLock(serverLocks, serverName); return getLock(serverLocks, serverName);
} }
LockAndQueue removeServerLock(ServerName serverName) {
return serverLocks.remove(serverName);
}
LockAndQueue getPeerLock(String peerId) { LockAndQueue getPeerLock(String peerId) {
return getLock(peerLocks, peerId); return getLock(peerLocks, peerId);
} }

View File

@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
class ServerQueue extends Queue<ServerName> { class ServerQueue extends Queue<ServerName> {
public ServerQueue(ServerName serverName, LockStatus serverLock) { public ServerQueue(ServerName serverName, int priority, LockStatus serverLock) {
super(serverName, serverLock); super(serverName, priority, serverLock);
} }
@Override @Override