From 5c2cb15e0b41d8cd1bb6b9014934c078978f929d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 19 Jun 2018 16:29:01 +0800 Subject: [PATCH] HBASE-20739 Add priority for SCP --- .../hbase/master/procedure/FairQueue.java | 6 +- .../procedure/MasterProcedureScheduler.java | 62 ++++++++++++++++--- .../master/procedure/MasterProcedureUtil.java | 9 ++- .../hbase/master/procedure/SchemaLocking.java | 4 ++ .../hbase/master/procedure/ServerQueue.java | 4 +- 5 files changed, 66 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java index ac8e5775866..59a5ef8c5c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java @@ -38,8 +38,10 @@ public class FairQueue> { return; } // Find the one which priority is less than us - // For now only TableQueue has priority, and there are only a small number of tables which - // have higher priority so this will not be an expensive operation. + // For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small + // number of tables which have higher priority, and for ServerQueue there is only one server + // which could carry meta which leads to a higher priority, so this will not be an expensive + // operation. Queue base = queueHead; do { if (base.getPriority() < queue.getPriority()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 5a29a097a73..4e2b3e35779 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -128,7 +128,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } else if (isTableProcedure(proc)) { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); } else if (isServerProcedure(proc)) { - doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + ServerProcedureInterface spi = (ServerProcedureInterface) proc; + doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); } else if (isPeerProcedure(proc)) { doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); } else { @@ -317,10 +318,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return; } } else if (proc instanceof PeerProcedureInterface) { - PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc; - tryCleanupPeerQueue(iProcPeer.getPeerId(), proc); + tryCleanupPeerQueue(getPeerId(proc), proc); + } else if (proc instanceof ServerProcedureInterface) { + tryCleanupServerQueue(getServerName(proc), proc); } else { - // No cleanup for ServerProcedureInterface types, yet. + // No cleanup for other procedure types, yet. return; } } @@ -367,16 +369,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Server Queue Lookup Helpers // ============================================================================ - private ServerQueue getServerQueue(ServerName serverName) { + private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { final int index = getBucketIndex(serverBuckets, serverName.hashCode()); ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); - if (node != null) return node; - - node = new ServerQueue(serverName, locking.getServerLock(serverName)); + if (node != null) { + return node; + } + int priority; + if (proc != null) { + priority = MasterProcedureUtil.getServerPriority(proc); + } else { + LOG.warn("Usually this should not happen as proc can only be null when calling from " + + "wait/wake lock, which means at least we should have one procedure in the queue which " + + "wants to acquire the lock or just released the lock."); + priority = 1; + } + node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); return node; } + private void removeServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + serverBuckets[index] = + AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + locking.removeServerLock(serverName); + } + + private void tryCleanupServerQueue(ServerName serverName, Procedure proc) { + schedLock(); + try { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + if (node == null) { + return; + } + + LockAndQueue lock = locking.getServerLock(serverName); + if (node.isEmpty() && lock.tryExclusiveLock(proc)) { + removeFromRunQueue(serverRunQueue, node); + removeServerQueue(serverName); + } + } finally { + schedUnlock(); + } + } + private static int getBucketIndex(Object[] buckets, int hashCode) { return Math.abs(hashCode) % buckets.length; } @@ -810,7 +848,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue lock = locking.getServerLock(serverName); if (lock.tryExclusiveLock(procedure)) { - removeFromRunQueue(serverRunQueue, getServerQueue(serverName)); + // We do not need to create a new queue so just pass null, as in tests we may pass + // procedures other than ServerProcedureInterface + removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null)); return false; } waitProcedure(lock, procedure); @@ -832,7 +872,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue lock = locking.getServerLock(serverName); lock.releaseExclusiveLock(procedure); - addToRunQueue(serverRunQueue, getServerQueue(serverName)); + // We do not need to create a new queue so just pass null, as in tests we may pass procedures + // other than ServerProcedureInterface + addToRunQueue(serverRunQueue, getServerQueue(serverName, null)); int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index 51e24523b3b..587cc82d522 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -170,11 +170,10 @@ public final class MasterProcedureUtil { } /** - * Return the total levels of table priority. Now we have 3 levels, for meta table, other system - * tables and user tables. Notice that the actual value of priority should be decreased from this - * value down to 1. + * Return the priority for the given procedure. For now we only have two priorities, 100 for + * server carrying meta, and 1 for others. */ - public static int getTablePriorityLevels() { - return 3; + public static int getServerPriority(ServerProcedureInterface proc) { + return proc.hasMetaTableRegion() ? 100 : 1; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 30be04ca014..de1d0ecd9ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -95,6 +95,10 @@ class SchemaLocking { return getLock(serverLocks, serverName); } + LockAndQueue removeServerLock(ServerName serverName) { + return serverLocks.remove(serverName); + } + LockAndQueue getPeerLock(String peerId) { return getLock(peerLocks, peerId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 5526f3b5d0f..3a1b3c4cd6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class ServerQueue extends Queue { - public ServerQueue(ServerName serverName, LockStatus serverLock) { - super(serverName, serverLock); + public ServerQueue(ServerName serverName, int priority, LockStatus serverLock) { + super(serverName, priority, serverLock); } @Override