HBASE-20739 Add priority for SCP
This commit is contained in:
parent
d23a517b61
commit
4cb70ea9f5
|
@ -38,8 +38,10 @@ public class FairQueue<T extends Comparable<T>> {
|
|||
return;
|
||||
}
|
||||
// Find the one which priority is less than us
|
||||
// For now only TableQueue has priority, and there are only a small number of tables which
|
||||
// have higher priority so this will not be an expensive operation.
|
||||
// For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small
|
||||
// number of tables which have higher priority, and for ServerQueue there is only one server
|
||||
// which could carry meta which leads to a higher priority, so this will not be an expensive
|
||||
// operation.
|
||||
Queue<T> base = queueHead;
|
||||
do {
|
||||
if (base.getPriority() < queue.getPriority()) {
|
||||
|
|
|
@ -127,7 +127,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
} else if (isTableProcedure(proc)) {
|
||||
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
|
||||
} else if (isServerProcedure(proc)) {
|
||||
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
|
||||
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
|
||||
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
|
||||
} else if (isPeerProcedure(proc)) {
|
||||
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
|
||||
} else {
|
||||
|
@ -316,10 +317,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return;
|
||||
}
|
||||
} else if (proc instanceof PeerProcedureInterface) {
|
||||
PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
|
||||
tryCleanupPeerQueue(iProcPeer.getPeerId(), proc);
|
||||
tryCleanupPeerQueue(getPeerId(proc), proc);
|
||||
} else if (proc instanceof ServerProcedureInterface) {
|
||||
tryCleanupServerQueue(getServerName(proc), proc);
|
||||
} else {
|
||||
// No cleanup for ServerProcedureInterface types, yet.
|
||||
// No cleanup for other procedure types, yet.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -366,16 +368,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
// ============================================================================
|
||||
// Server Queue Lookup Helpers
|
||||
// ============================================================================
|
||||
private ServerQueue getServerQueue(ServerName serverName) {
|
||||
private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
|
||||
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
|
||||
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
|
||||
if (node != null) return node;
|
||||
|
||||
node = new ServerQueue(serverName, locking.getServerLock(serverName));
|
||||
if (node != null) {
|
||||
return node;
|
||||
}
|
||||
int priority;
|
||||
if (proc != null) {
|
||||
priority = MasterProcedureUtil.getServerPriority(proc);
|
||||
} else {
|
||||
LOG.warn("Usually this should not happen as proc can only be null when calling from " +
|
||||
"wait/wake lock, which means at least we should have one procedure in the queue which " +
|
||||
"wants to acquire the lock or just released the lock.");
|
||||
priority = 1;
|
||||
}
|
||||
node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
|
||||
serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
|
||||
return node;
|
||||
}
|
||||
|
||||
private void removeServerQueue(ServerName serverName) {
|
||||
int index = getBucketIndex(serverBuckets, serverName.hashCode());
|
||||
serverBuckets[index] =
|
||||
AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
|
||||
locking.removeServerLock(serverName);
|
||||
}
|
||||
|
||||
private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
|
||||
schedLock();
|
||||
try {
|
||||
int index = getBucketIndex(serverBuckets, serverName.hashCode());
|
||||
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
LockAndQueue lock = locking.getServerLock(serverName);
|
||||
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
|
||||
removeFromRunQueue(serverRunQueue, node);
|
||||
removeServerQueue(serverName);
|
||||
}
|
||||
} finally {
|
||||
schedUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static int getBucketIndex(Object[] buckets, int hashCode) {
|
||||
return Math.abs(hashCode) % buckets.length;
|
||||
}
|
||||
|
@ -809,7 +847,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getServerLock(serverName);
|
||||
if (lock.tryExclusiveLock(procedure)) {
|
||||
removeFromRunQueue(serverRunQueue, getServerQueue(serverName));
|
||||
// We do not need to create a new queue so just pass null, as in tests we may pass
|
||||
// procedures other than ServerProcedureInterface
|
||||
removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
|
||||
return false;
|
||||
}
|
||||
waitProcedure(lock, procedure);
|
||||
|
@ -831,7 +871,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
try {
|
||||
final LockAndQueue lock = locking.getServerLock(serverName);
|
||||
lock.releaseExclusiveLock(procedure);
|
||||
addToRunQueue(serverRunQueue, getServerQueue(serverName));
|
||||
// We do not need to create a new queue so just pass null, as in tests we may pass procedures
|
||||
// other than ServerProcedureInterface
|
||||
addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
|
||||
int waitingCount = wakeWaitingProcedures(lock);
|
||||
wakePollIfNeeded(waitingCount);
|
||||
} finally {
|
||||
|
|
|
@ -170,11 +170,10 @@ public final class MasterProcedureUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the total levels of table priority. Now we have 3 levels, for meta table, other system
|
||||
* tables and user tables. Notice that the actual value of priority should be decreased from this
|
||||
* value down to 1.
|
||||
* Return the priority for the given procedure. For now we only have two priorities, 100 for
|
||||
* server carrying meta, and 1 for others.
|
||||
*/
|
||||
public static int getTablePriorityLevels() {
|
||||
return 3;
|
||||
public static int getServerPriority(ServerProcedureInterface proc) {
|
||||
return proc.hasMetaTableRegion() ? 100 : 1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,6 +95,10 @@ class SchemaLocking {
|
|||
return getLock(serverLocks, serverName);
|
||||
}
|
||||
|
||||
LockAndQueue removeServerLock(ServerName serverName) {
|
||||
return serverLocks.remove(serverName);
|
||||
}
|
||||
|
||||
LockAndQueue getPeerLock(String peerId) {
|
||||
return getLock(peerLocks, peerId);
|
||||
}
|
||||
|
|
|
@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
class ServerQueue extends Queue<ServerName> {
|
||||
|
||||
public ServerQueue(ServerName serverName, LockStatus serverLock) {
|
||||
super(serverName, serverLock);
|
||||
public ServerQueue(ServerName serverName, int priority, LockStatus serverLock) {
|
||||
super(serverName, priority, serverLock);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue