From 826b9436fbba39ce78650680c71425bc8547cd39 Mon Sep 17 00:00:00 2001 From: Apekshit Sharma Date: Tue, 24 Jan 2017 14:31:28 -0800 Subject: [PATCH] HBASE-17605 Changes - Moved locks out of MasterProcedureScheduler#Queue. One Queue object is used for each namespace/table, which aren't more than 100. So we don't need complexity arising from all functionalities being in one place. SchemaLocking now owns locks and locking implementaion has been moved to procedure2 package. - Removed NamespaceQueue because it wasn't being used as Queue (add,peek,poll,etc functions threw UnsupportedOperationException). It's was only used for locks on namespaces. Now that locks have been moved out of Queue class, it's not needed anymore. - Remoed RegionEvent which was there only for locking on regions. Tables/namespaces used locking from Queue class and regions couldn't (there are no separate proc queue at region level), hence the redundance. Now that locking is separate, we can use the same for regions too. - Removed QueueInterface class. No declarations, except one implementaion, which makes the point of having an interface moot. - Removed QueueImpl, which was the only concrete implementation of abstract Queue class. Moved functions to Queue class itself to avoid unnecessary level in inheritance hierarchy. - Removed ProcedureEventQueue class which was just a wrapper around ArrayDeque class. But we now have ProcedureWaitQueue as 'Type class'. - Encapsulated table priority related stuff in a single class. - Removed some unused functions. Change-Id: I6a60424cb41e280bc111703053aa179d9071ba17 --- .../AbstractProcedureScheduler.java | 35 +- .../hadoop/hbase/procedure2/LockAndQueue.java | 114 +++ .../hadoop/hbase/procedure2/LockStatus.java | 34 + .../hbase/procedure2/ProcedureDeque.java | 34 + .../hbase/procedure2/ProcedureEvent.java | 22 +- .../hbase/procedure2/ProcedureEventQueue.java | 85 --- .../procedure2/SimpleProcedureScheduler.java | 4 +- .../hbase/master/locking/LockProcedure.java | 9 +- .../master/procedure/MasterProcedureEnv.java | 8 - .../procedure/MasterProcedureScheduler.java | 721 +++++++----------- .../procedure/TestMasterProcedureEvents.java | 10 +- 11 files changed, 502 insertions(+), 574 deletions(-) create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java create mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java delete mode 100644 hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index ff8d97852a6..2a678c04d2b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -232,7 +232,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { if (event.isReady()) { return false; } - suspendProcedure(event, procedure); + waitProcedure(event.getSuspendedProcedures(), procedure); return true; } } @@ -266,7 +266,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { if (isTraceEnabled) { LOG.trace("Wake event " + event); } - waitingCount += popEventWaitingObjects(event); + waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); } } wakePollIfNeeded(waitingCount); @@ -275,21 +275,23 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } } - protected int popEventWaitingObjects(final ProcedureEvent event) { - return popEventWaitingProcedures(event); - } - - protected int popEventWaitingProcedures(final ProcedureEventQueue event) { - int count = 0; - while (event.hasWaitingProcedures()) { - wakeProcedure(event.popWaitingProcedure(false)); - count++; + /** + * Wakes up given waiting procedures by pushing them back into scheduler queues. + * @return size of given {@code waitQueue}. + */ + protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { + int count = waitQueue.size(); + // wakeProcedure adds to the front of queue, so we start from last in the + // waitQueue' queue, so that the procedure which was added first goes in the front for + // the scheduler queue. + while (!waitQueue.isEmpty()) { + wakeProcedure(waitQueue.removeLast()); } return count; } - protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) { - event.suspendProcedure(procedure); + protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { + waitQueue.addLast(proc); } protected void wakeProcedure(final Procedure procedure) { @@ -308,10 +310,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void wakePollIfNeeded(final int waitingCount) { - if (waitingCount > 1) { - schedWaitCond.signalAll(); - } else if (waitingCount > 0) { + if (waitingCount <= 0) return; + if (waitingCount == 1) { schedWaitCond.signal(); + } else { + schedWaitCond.signalAll(); } } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java new file mode 100644 index 00000000000..19ba28cefa5 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +/** + * Locking for mutual exclusion between procedures. Only by procedure framework internally. + * {@link LockAndQueue} has two purposes: + *
    + *
  1. Acquire/release exclusive/shared locks
  2. + *
  3. Maintain a list of procedures waiting for this lock
    + * To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class. Using inheritance over + * composition for this need is unusual, but the choice is motivated by million regions + * assignment case as it will reduce memory footprint and number of objects to be GCed. + *
+ * + * NOT thread-safe. Needs external concurrency control. For eg. Uses in MasterProcedureScheduler are + * guarded by schedLock(). + *
+ * There is no need of 'volatile' keyword for member variables because of memory synchronization + * guarantees of locks (see 'Memory Synchronization', + * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) + *
+ * We do not implement Lock interface because we need exclusive + shared locking, and also + * because try-lock functions require procedure id. + *
+ * We do not use ReentrantReadWriteLock directly because of its high memory overhead. + */ +public class LockAndQueue extends ProcedureDeque implements LockStatus { + private long exclusiveLockProcIdOwner = Long.MIN_VALUE; + private int sharedLock = 0; + + // ====================================================================== + // Lock Status + // ====================================================================== + + @Override + public boolean isLocked() { + return hasExclusiveLock() || sharedLock > 0; + } + + @Override + public boolean hasExclusiveLock() { + return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; + } + + @Override + public boolean isLockOwner(long procId) { + return exclusiveLockProcIdOwner == procId; + } + + @Override + public boolean hasParentLock(final Procedure proc) { + return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + } + + @Override + public boolean hasLockAccess(final Procedure proc) { + return isLockOwner(proc.getProcId()) || hasParentLock(proc); + } + + @Override + public long getExclusiveLockProcIdOwner() { + return exclusiveLockProcIdOwner; + } + + @Override + public int getSharedLockCount() { + return sharedLock; + } + + // ====================================================================== + // try/release Shared/Exclusive lock + // ====================================================================== + + public boolean trySharedLock() { + if (hasExclusiveLock()) return false; + sharedLock++; + return true; + } + + public boolean releaseSharedLock() { + return --sharedLock == 0; + } + + public boolean tryExclusiveLock(final Procedure proc) { + if (isLocked()) return hasLockAccess(proc); + exclusiveLockProcIdOwner = proc.getProcId(); + return true; + } + + public boolean releaseExclusiveLock(final Procedure proc) { + if (isLockOwner(proc.getProcId())) { + exclusiveLockProcIdOwner = Long.MIN_VALUE; + return true; + } + return false; + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java new file mode 100644 index 00000000000..9f2aae7a1e1 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +/** + * Interface to get status of a Lock without getting access to acquire/release lock. + * Currently used in MasterProcedureScheduler where we want to give Queues access to lock's + * status for scheduling purposes, but not the ability to acquire/release it. + */ +public interface LockStatus { + boolean isLocked(); + boolean hasExclusiveLock(); + boolean isLockOwner(long procId); + boolean hasParentLock(final Procedure proc); + boolean hasLockAccess(final Procedure proc); + long getExclusiveLockProcIdOwner(); + int getSharedLockCount(); +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java new file mode 100644 index 00000000000..975cbdbab85 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java @@ -0,0 +1,34 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure2.Procedure; + +import java.util.ArrayDeque; + +/** + * Type class. + * For conceptual purpose only. Seeing ProcedureDeque as type instead of just ArrayDeque gives + * more understanding that it's a queue of waiting procedures. + */ +@InterfaceAudience.Private +public class ProcedureDeque extends ArrayDeque { +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java index 6335832508f..cb90ac0a8b2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -22,34 +22,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; /** - * Basic ProcedureEvent that contains an "object", which can be a - * description or a reference to the resource to wait on, and a - * queue for suspended procedures. + * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the + * resource to wait on, and a queue for suspended procedures. + * Access to suspended procedures queue is 'synchronized' on the event itself. */ @InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureEvent extends ProcedureEventQueue { +public class ProcedureEvent { private final T object; - private boolean ready = false; + private ProcedureDeque suspendedProcedures = new ProcedureDeque(); public ProcedureEvent(final T object) { this.object = object; } - public T getObject() { - return object; - } - public synchronized boolean isReady() { return ready; } - @InterfaceAudience.Private - protected synchronized void setReady(final boolean isReady) { + synchronized void setReady(final boolean isReady) { this.ready = isReady; } + public ProcedureDeque getSuspendedProcedures() { + return suspendedProcedures; + } + @Override public String toString() { return getClass().getSimpleName() + "(" + object + ")"; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java deleted file mode 100644 index a109e9e639d..00000000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import com.google.common.annotations.VisibleForTesting; - -import java.util.ArrayDeque; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Basic queue to store suspended procedures. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureEventQueue { - private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class); - - private ArrayDeque waitingProcedures = null; - - public ProcedureEventQueue() { - } - - @InterfaceAudience.Private - public synchronized void suspendProcedure(final Procedure proc) { - if (waitingProcedures == null) { - waitingProcedures = new ArrayDeque(); - } - waitingProcedures.addLast(proc); - } - - @InterfaceAudience.Private - public synchronized void removeProcedure(final Procedure proc) { - if (waitingProcedures != null) { - waitingProcedures.remove(proc); - } - } - - @InterfaceAudience.Private - public synchronized boolean hasWaitingProcedures() { - return waitingProcedures != null; - } - - @InterfaceAudience.Private - public synchronized Procedure popWaitingProcedure(final boolean popFront) { - // it will be nice to use IterableList on a procedure and avoid allocations... - Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); - if (waitingProcedures.isEmpty()) { - waitingProcedures = null; - } - return proc; - } - - @VisibleForTesting - public synchronized void clear() { - waitingProcedures = null; - } - - @VisibleForTesting - public synchronized int size() { - if (waitingProcedures != null) { - return waitingProcedures.size(); - } - return 0; - } -} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index ffc8273a99b..1a6775ae15d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.procedure2; -import java.util.ArrayDeque; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -29,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class SimpleProcedureScheduler extends AbstractProcedureScheduler { - private final ArrayDeque runnables = new ArrayDeque(); + private final ProcedureDeque runnables = new ProcedureDeque(); @Override protected void enqueue(final Procedure procedure, final boolean addFront) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index 20fc4922842..c4b49f0bd5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -42,6 +42,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +/** + * Procedure to allow clients and external admin tools to take locks on table/namespace/regions. + * This procedure when scheduled, acquires specified locks, suspends itself and waits for : + * - call to unlock: if lock request came from the process itself, say master chore. + * - Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding + * the lock or not based on last heartbeat timestamp. + */ @InterfaceAudience.Private public final class LockProcedure extends Procedure implements TableProcedureInterface { @@ -71,7 +78,7 @@ public final class LockProcedure extends Procedure // this is for internal working private boolean hasLock; - private final ProcedureEvent event = new ProcedureEvent(this); + private final ProcedureEvent event = new ProcedureEvent<>(this); // True if this proc acquired relevant locks. This value is for client checks. private final AtomicBoolean locked = new AtomicBoolean(false); // Last system time (in ms) when client sent the heartbeat. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 87c79b69abe..2cd5b0839c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -141,14 +141,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); } - public void wake(ProcedureEvent event) { - procSched.wakeEvent(event); - } - - public void suspend(ProcedureEvent event) { - procSched.suspendEvent(event); - } - public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { procSched.wakeEvent(event); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 8ec53a8504a..a3adf025414 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.master.procedure; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.LockStatus; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue; +import org.apache.hadoop.hbase.procedure2.LockAndQueue; +import org.apache.hadoop.hbase.procedure2.ProcedureDeque; import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; @@ -58,35 +60,87 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; * each procedure will be pushed in its own queue, and based on the operation type * we may take smarter decision. e.g. we can abort all the operations preceding * a delete table, or similar. + * + *

Concurrency control

+ * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, + * serverBuckets) is controlled by schedLock(). That mainly includes:
+ *
    + *
  • + * {@link #push(Procedure, boolean, boolean)} : A push will add a Queue back to run-queue + * when: + *
      + *
    1. queue was empty before push (so must have been out of run-queue)
    2. + *
    3. child procedure is added (which means parent procedure holds exclusive lock, and it + * must have moved Queue out of run-queue)
    4. + *
    + *
  • + *
  • + * {@link #poll(long)} : A poll will remove a Queue from run-queue when: + *
      + *
    1. queue becomes empty after poll
    2. + *
    3. exclusive lock is requested by polled procedure and lock is available (returns the + * procedure)
    4. + *
    5. exclusive lock is requested but lock is not available (returns null)
    6. + *
    7. Polled procedure is child of parent holding exclusive lock, and the next procedure is + * not a child
    8. + *
    + *
  • + *
  • + * namespace/table/region locks: Queue is added back to run-queue when lock being released is: + *
      + *
    1. exclusive lock
    2. + *
    3. last shared lock (in case queue was removed because next procedure in queue required + * exclusive lock)
    4. + *
    + *
  • + *
*/ @InterfaceAudience.Private @InterfaceStability.Evolving public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); - private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = - new NamespaceQueueKeyComparator(); private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR = new ServerQueueKeyComparator(); private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = new TableQueueKeyComparator(); - private final FairQueue serverRunQueue = new FairQueue(); - private final FairQueue tableRunQueue = new FairQueue(); + private final FairQueue serverRunQueue = new FairQueue<>(); + private final FairQueue tableRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; - private NamespaceQueue namespaceMap = null; private TableQueue tableMap = null; + private final SchemaLocking locking = new SchemaLocking(); - private final int metaTablePriority; - private final int userTablePriority; - private final int sysTablePriority; + /** + * Table priority is used when scheduling procedures from {@link #tableRunQueue}. A TableQueue + * with priority 2 will get its procedures scheduled at twice the rate as compared to + * TableQueue with priority 1. + */ + private static class TablePriorities { + TablePriorities(Configuration conf) { + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + final int metaTablePriority; + final int userTablePriority; + final int sysTablePriority; + + int getPriority(TableName tableName) { + if (tableName.equals(TableName.META_TABLE_NAME)) { + return metaTablePriority; + } else if (tableName.isSystemTable()) { + return sysTablePriority; + } + return userTablePriority; + } + } + private final TablePriorities tablePriorities; public MasterProcedureScheduler(final Configuration conf) { - // TODO: should this be part of the HTD? - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + tablePriorities = new TablePriorities(conf); } @Override @@ -113,11 +167,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private > void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { queue.add(proc, addFront); - if (!queue.hasExclusiveLock() || queue.isLockOwner(proc.getProcId())) { + if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) { // if the queue was not remove for an xlock execution // or the proc is the lock owner, put the queue back into execution addToRunQueue(fairq, queue); - } else if (queue.hasParentLock(proc)) { + } else if (queue.getLockStatus().hasParentLock(proc)) { assert addFront : "expected to add a child in the front"; // our (proc) parent has the xlock, // so the queue is not in the fairq (run-queue) @@ -151,7 +205,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final Procedure pollResult = rq.peek(); final boolean xlockReq = rq.requireExclusiveLock(pollResult); - if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) { + if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) { // someone is already holding the lock (e.g. shared lock). avoid a yield removeFromRunQueue(fairq, rq); return null; @@ -160,7 +214,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { rq.poll(); if (rq.isEmpty() || xlockReq) { removeFromRunQueue(fairq, rq); - } else if (rq.hasParentLock(pollResult)) { + } else if (rq.getLockStatus().hasParentLock(pollResult)) { // if the rq is in the fairq because of runnable child // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state @@ -181,10 +235,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { serverBuckets[i] = null; } - // Remove Namespaces - clear(namespaceMap, null, NAMESPACE_QUEUE_KEY_COMPARATOR); - namespaceMap = null; - // Remove Tables clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); tableMap = null; @@ -206,7 +256,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { int count = 0; // Server queues - final AvlTreeIterator serverIter = new AvlTreeIterator(); + final AvlTreeIterator serverIter = new AvlTreeIterator<>(); for (int i = 0; i < serverBuckets.length; ++i) { serverIter.seekFirst(serverBuckets[i]); while (serverIter.hasNext()) { @@ -215,7 +265,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } // Table queues - final AvlTreeIterator tableIter = new AvlTreeIterator(tableMap); + final AvlTreeIterator tableIter = new AvlTreeIterator<>(tableMap); while (tableIter.hasNext()) { count += tableIter.next().size(); } @@ -250,13 +300,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private > void addToRunQueue(FairQueue fairq, Queue queue) { + private static > void addToRunQueue(FairQueue fairq, Queue queue) { if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { fairq.add(queue); } } - private > void removeFromRunQueue(FairQueue fairq, Queue queue) { + private static > void removeFromRunQueue( + FairQueue fairq, Queue queue) { if (AvlIterableList.isLinked(queue)) { fairq.remove(queue); } @@ -265,37 +316,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Table Queue Lookup Helpers // ============================================================================ - private TableQueue getTableQueueWithLock(TableName tableName) { - schedLock(); - try { - return getTableQueue(tableName); - } finally { - schedUnlock(); - } - } - private TableQueue getTableQueue(TableName tableName) { TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); if (node != null) return node; - NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString()); - node = new TableQueue(tableName, nsQueue, getTablePriority(tableName)); + node = new TableQueue(tableName, tablePriorities.getPriority(tableName), + locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); tableMap = AvlTree.insert(tableMap, node); return node; } private void removeTableQueue(TableName tableName) { tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); + locking.removeTableLock(tableName); } - private int getTablePriority(TableName tableName) { - if (tableName.equals(TableName.META_TABLE_NAME)) { - return metaTablePriority; - } else if (tableName.isSystemTable()) { - return sysTablePriority; - } - return userTablePriority; - } private static boolean isTableProcedure(Procedure proc) { return proc instanceof TableProcedureInterface; @@ -305,44 +340,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return ((TableProcedureInterface)proc).getTableName(); } - // ============================================================================ - // Namespace Queue Lookup Helpers - // ============================================================================ - private NamespaceQueue getNamespaceQueue(String namespace) { - NamespaceQueue node = AvlTree.get(namespaceMap, namespace, NAMESPACE_QUEUE_KEY_COMPARATOR); - if (node != null) return (NamespaceQueue)node; - - node = new NamespaceQueue(namespace); - namespaceMap = AvlTree.insert(namespaceMap, node); - return node; - } - // ============================================================================ // Server Queue Lookup Helpers // ============================================================================ - private ServerQueue getServerQueueWithLock(ServerName serverName) { - schedLock(); - try { - return getServerQueue(serverName); - } finally { - schedUnlock(); - } - } - private ServerQueue getServerQueue(ServerName serverName) { final int index = getBucketIndex(serverBuckets, serverName.hashCode()); ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); if (node != null) return node; - node = new ServerQueue(serverName); + node = new ServerQueue(serverName, locking.getServerLock(serverName)); serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); - return (ServerQueue)node; - } - - private void removeServerQueue(ServerName serverName) { - final int index = getBucketIndex(serverBuckets, serverName.hashCode()); - final ServerQueue root = serverBuckets[index]; - serverBuckets[index] = AvlTree.remove(root, serverName, SERVER_QUEUE_KEY_COMPARATOR); + return node; } private static int getBucketIndex(Object[] buckets, int hashCode) { @@ -367,9 +375,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - public static class ServerQueue extends QueueImpl { - public ServerQueue(ServerName serverName) { - super(serverName); + public static class ServerQueue extends Queue { + public ServerQueue(ServerName serverName, LockStatus serverLock) { + super(serverName, serverLock); } public boolean requireExclusiveLock(Procedure proc) { @@ -384,55 +392,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private static class RegionEvent extends ProcedureEventQueue { - private final HRegionInfo regionInfo; - private long exclusiveLockProcIdOwner = Long.MIN_VALUE; - - public RegionEvent(HRegionInfo regionInfo) { - this.regionInfo = regionInfo; - } - - public boolean hasExclusiveLock() { - return exclusiveLockProcIdOwner != Long.MIN_VALUE; - } - - public boolean isLockOwner(long procId) { - return exclusiveLockProcIdOwner == procId; - } - - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && - (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); - } - - public boolean hasLockAccess(final Procedure proc) { - return isLockOwner(proc.getProcId()) || hasParentLock(proc); - } - - public boolean tryExclusiveLock(final Procedure proc) { - if (hasExclusiveLock()) return hasLockAccess(proc); - exclusiveLockProcIdOwner = proc.getProcId(); - return true; - } - - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockProcIdOwner = Long.MIN_VALUE; - return true; - } - return false; - } - - public HRegionInfo getRegionInfo() { - return regionInfo; - } - - @Override - public String toString() { - return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")"; - } - } - private static class TableQueueKeyComparator implements AvlKeyComparator { @Override public int compareKey(TableQueue node, Object key) { @@ -440,124 +399,39 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - public static class TableQueue extends QueueImpl { - private final NamespaceQueue namespaceQueue; + public static class TableQueue extends Queue { + private final LockStatus namespaceLockStatus; - private HashMap regionEventMap; - - public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { - super(tableName, priority); - this.namespaceQueue = namespaceQueue; - } - - public NamespaceQueue getNamespaceQueue() { - return namespaceQueue; + public TableQueue(TableName tableName, int priority, LockStatus tableLock, + LockStatus namespaceLockStatus) { + super(tableName, priority, tableLock); + this.namespaceLockStatus = namespaceLockStatus; } @Override public boolean isAvailable() { // if there are no items in the queue, or the namespace is locked. // we can't execute operation on this table - if (isEmpty() || namespaceQueue.hasExclusiveLock()) { + if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) { return false; } - if (hasExclusiveLock()) { + if (getLockStatus().hasExclusiveLock()) { // if we have an exclusive lock already taken // only child of the lock owner can be executed final Procedure nextProc = peek(); - return nextProc != null && hasLockAccess(nextProc); + return nextProc != null && getLockStatus().hasLockAccess(nextProc); } // no xlock return true; } - public RegionEvent getRegionEvent(final HRegionInfo regionInfo) { - if (regionEventMap == null) { - regionEventMap = new HashMap(); - } - RegionEvent event = regionEventMap.get(regionInfo.getEncodedName()); - if (event == null) { - event = new RegionEvent(regionInfo); - regionEventMap.put(regionInfo.getEncodedName(), event); - } - return event; - } - - public void removeRegionEvent(final RegionEvent event) { - regionEventMap.remove(event.getRegionInfo().getEncodedName()); - if (regionEventMap.isEmpty()) { - regionEventMap = null; - } - } - - // TODO: We can abort pending/in-progress operation if the new call is - // something like drop table. We can Override addBack(), - // check the type and abort all the in-flight procedurs. - private boolean canAbortPendingOperations(Procedure proc) { - TableProcedureInterface tpi = (TableProcedureInterface)proc; - switch (tpi.getTableOperationType()) { - case DELETE: - return true; - default: - return false; - } - } - public boolean requireExclusiveLock(Procedure proc) { return requireTableExclusiveLock((TableProcedureInterface)proc); } } - private static class NamespaceQueueKeyComparator implements AvlKeyComparator { - @Override - public int compareKey(NamespaceQueue node, Object key) { - return node.compareKey((String)key); - } - } - - /** - * the namespace is currently used just as a rwlock, not as a queue. - * because ns operation are not frequent enough. so we want to avoid - * having to move table queues around for suspend/resume. - */ - private static class NamespaceQueue extends Queue { - public NamespaceQueue(String namespace) { - super(namespace); - } - - @Override - public boolean requireExclusiveLock(Procedure proc) { - throw new UnsupportedOperationException(); - } - - @Override - public void add(final Procedure proc, final boolean addToFront) { - throw new UnsupportedOperationException(); - } - - @Override - public Procedure peek() { - throw new UnsupportedOperationException(); - } - - @Override - public Procedure poll() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException(); - } - - @Override - public int size() { - throw new UnsupportedOperationException(); - } - } - // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -598,18 +472,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (!nsQueue.trySharedLock()) { - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); + if (!namespaceLock.trySharedLock()) { + waitProcedure(namespaceLock, procedure); return true; } - if (!tableQueue.tryExclusiveLock(procedure)) { - nsQueue.releaseSharedLock(); - suspendProcedure(tableQueue.getEvent(), procedure); + if (!tableLock.tryExclusiveLock(procedure)) { + namespaceLock.releaseSharedLock(); + waitProcedure(tableLock, procedure); return true; } - removeFromRunQueue(tableRunQueue, tableQueue); + removeFromRunQueue(tableRunQueue, getTableQueue(table)); return false; } finally { schedUnlock(); @@ -624,18 +498,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - if (!tableQueue.hasParentLock(procedure)) { - tableQueue.releaseExclusiveLock(procedure); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if (!tableLock.hasParentLock(procedure)) { + tableLock.releaseExclusiveLock(procedure); + waitingCount += wakeWaitingProcedures(tableLock); } - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (nsQueue.releaseSharedLock()) { - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + if (namespaceLock.releaseSharedLock()) { + waitingCount += wakeWaitingProcedures(namespaceLock); } - addToRunQueue(tableRunQueue, tableQueue); + addToRunQueue(tableRunQueue, getTableQueue(table)); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -656,20 +530,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (!nsQueue.trySharedLock()) { - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); + if (!namespaceLock.trySharedLock()) { + waitProcedure(namespaceLock, procedure); return null; } - if (!tableQueue.trySharedLock()) { - tableQueue.getNamespaceQueue().releaseSharedLock(); - suspendProcedure(tableQueue.getEvent(), procedure); + if (!tableLock.trySharedLock()) { + namespaceLock.releaseSharedLock(); + waitProcedure(tableLock, procedure); return null; } - return tableQueue; + return getTableQueue(table); } finally { schedUnlock(); } @@ -683,15 +557,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeTableSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - if (tableQueue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, tableQueue); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if (tableLock.releaseSharedLock()) { + addToRunQueue(tableRunQueue, getTableQueue(table)); + waitingCount += wakeWaitingProcedures(tableLock); } - if (nsQueue.releaseSharedLock()) { - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + if (namespaceLock.releaseSharedLock()) { + waitingCount += wakeWaitingProcedures(namespaceLock); } wakePollIfNeeded(waitingCount); } finally { @@ -712,10 +586,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { schedLock(); try { - TableQueue queue = getTableQueue(table); + final TableQueue queue = getTableQueue(table); + final LockAndQueue tableLock = locking.getTableLock(table); if (queue == null) return true; - if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) { + if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { // remove the table from the run-queue and the map if (AvlIterableList.isLinked(queue)) { tableRunQueue.remove(queue); @@ -754,41 +629,40 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitRegions(final Procedure procedure, final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); + schedLock(); + try { + // If there is parent procedure, it would have already taken xlock, so no need to take + // shared lock here. Otherwise, take shared lock. + if (!procedure.hasParent() + && waitTableQueueSharedLock(procedure, table) == null) { + return true; + } - final TableQueue queue; - if (procedure.hasParent()) { - // the assumption is that the parent procedure have already the table xlock - queue = getTableQueueWithLock(table); - } else { - // acquire the table shared-lock - queue = waitTableQueueSharedLock(procedure, table); - if (queue == null) return true; - } - - // acquire region xlocks or wait - boolean hasLock = true; - final RegionEvent[] event = new RegionEvent[regionInfo.length]; - synchronized (queue) { + // acquire region xlocks or wait + boolean hasLock = true; + final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); - assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; - event[i] = queue.getRegionEvent(regionInfo[i]); - if (!event[i].tryExclusiveLock(procedure)) { - suspendProcedure(event[i], procedure); + regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); + if (!regionLocks[i].tryExclusiveLock(procedure)) { + waitProcedure(regionLocks[i], procedure); hasLock = false; while (i-- > 0) { - event[i].releaseExclusiveLock(procedure); + regionLocks[i].releaseExclusiveLock(procedure); } break; } } - } - if (!hasLock && !procedure.hasParent()) { - wakeTableSharedLock(procedure, table); + if (!hasLock && !procedure.hasParent()) { + wakeTableSharedLock(procedure, table); + } + return !hasLock; + } finally { + schedUnlock(); } - return !hasLock; } /** @@ -808,32 +682,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeRegions(final Procedure procedure,final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); - - final TableQueue queue = getTableQueueWithLock(table); - - int numProcs = 0; - final Procedure[] nextProcs = new Procedure[regionInfo.length]; - synchronized (queue) { - HRegionInfo prevRegion = null; + schedLock(); + try { + int numProcs = 0; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); - assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; - RegionEvent event = queue.getRegionEvent(regionInfo[i]); - if (event.releaseExclusiveLock(procedure)) { - if (event.hasWaitingProcedures()) { + LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); + if (regionLock.releaseExclusiveLock(procedure)) { + if (!regionLock.isEmpty()) { // release one procedure at the time since regions has an xlock - nextProcs[numProcs++] = event.popWaitingProcedure(true); + nextProcs[numProcs++] = regionLock.removeFirst(); } else { - queue.removeRegionEvent(event); + locking.removeRegionLock(regionInfo[i].getEncodedName()); } } } - } - // awake procedures if any - schedLock(); - try { + // awake procedures if any for (int i = numProcs - 1; i >= 0; --i) { wakeProcedure(nextProcs[i]); } @@ -855,22 +723,23 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * Suspend the procedure if the specified namespace is already locked. * @see #wakeNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure trying to acquire the lock - * @param nsName Namespace to lock + * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - if (!tableQueue.trySharedLock()) { - suspendProcedure(tableQueue.getEvent(), procedure); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + if (!systemNamespaceTableLock.trySharedLock()) { + waitProcedure(systemNamespaceTableLock, procedure); return true; } - final NamespaceQueue nsQueue = getNamespaceQueue(nsName); - if (!nsQueue.tryExclusiveLock(procedure)) { - tableQueue.releaseSharedLock(); - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + if (!namespaceLock.tryExclusiveLock(procedure)) { + systemNamespaceTableLock.releaseSharedLock(); + waitProcedure(namespaceLock, procedure); return true; } return false; @@ -883,20 +752,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * Wake the procedures waiting for the specified namespace * @see #waitNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure releasing the lock - * @param nsName the namespace that has the exclusive lock + * @param namespace the namespace that has the exclusive lock */ - public void wakeNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - final NamespaceQueue nsQueue = getNamespaceQueue(nsName); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + namespaceLock.releaseExclusiveLock(procedure); int waitingCount = 0; - nsQueue.releaseExclusiveLock(procedure); - if (tableQueue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, tableQueue); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if(systemNamespaceTableLock.releaseSharedLock()) { + addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME)); + waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); } - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + waitingCount += wakeWaitingProcedures(namespaceLock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -916,12 +786,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { - ServerQueue queue = getServerQueue(serverName); - if (queue.tryExclusiveLock(procedure)) { - removeFromRunQueue(serverRunQueue, queue); + final LockAndQueue lock = locking.getServerLock(serverName); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(serverRunQueue, getServerQueue(serverName)); return false; } - suspendProcedure(queue.getEvent(), procedure); + waitProcedure(lock, procedure); return true; } finally { schedUnlock(); @@ -937,10 +807,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { - final ServerQueue queue = getServerQueue(serverName); - queue.releaseExclusiveLock(procedure); - addToRunQueue(serverRunQueue, queue); - int waitingCount = popEventWaitingProcedures(queue.getEvent()); + final LockAndQueue lock = locking.getServerLock(serverName); + lock.releaseExclusiveLock(procedure); + addToRunQueue(serverRunQueue, getServerQueue(serverName)); + int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -950,38 +820,24 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Generic Helpers // ============================================================================ - private static interface QueueInterface { - boolean isAvailable(); - boolean isEmpty(); - int size(); - - void add(Procedure proc, boolean addFront); - boolean requireExclusiveLock(Procedure proc); - Procedure peek(); - Procedure poll(); - } - - // TODO Why OK not having synchronized access and/or volatiles and - // sharedLock-- and sharedLock++? Is this accessed by one thread only? - // Write up the concurrency expectations. St.Ack 01/19/2017 private static abstract class Queue> - extends AvlLinkedNode> implements QueueInterface { - private final ProcedureEventQueue event; - - private long exclusiveLockProcIdOwner = Long.MIN_VALUE; - private int sharedLock = 0; + extends AvlLinkedNode> { + abstract boolean requireExclusiveLock(Procedure proc); private final TKey key; private final int priority; + private final ProcedureDeque runnables = new ProcedureDeque(); + // Reference to status of lock on entity this queue represents. + private final LockStatus lockStatus; - public Queue(TKey key) { - this(key, 1); + public Queue(TKey key, LockStatus lockStatus) { + this(key, 1, lockStatus); } - public Queue(TKey key, int priority) { + public Queue(TKey key, int priority, LockStatus lockStatus) { this.key = key; this.priority = priority; - this.event = new ProcedureEventQueue(); + this.lockStatus = lockStatus; } protected TKey getKey() { @@ -992,66 +848,41 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return priority; } - public ProcedureEventQueue getEvent() { - return event; - } - - // ====================================================================== - // Read/Write Locking helpers - // ====================================================================== - public boolean isLocked() { - return hasExclusiveLock() || sharedLock > 0; - } - - public boolean hasExclusiveLock() { - return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; - } - - public boolean trySharedLock() { - if (hasExclusiveLock()) return false; - sharedLock++; - return true; - } - - public boolean releaseSharedLock() { - return --sharedLock == 0; - } - - protected boolean isSingleSharedLock() { - return sharedLock == 1; - } - - public boolean isLockOwner(long procId) { - return exclusiveLockProcIdOwner == procId; - } - - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && - (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); - } - - public boolean hasLockAccess(final Procedure proc) { - return isLockOwner(proc.getProcId()) || hasParentLock(proc); - } - - public boolean tryExclusiveLock(final Procedure proc) { - if (isLocked()) return hasLockAccess(proc); - exclusiveLockProcIdOwner = proc.getProcId(); - return true; - } - - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockProcIdOwner = Long.MIN_VALUE; - return true; - } - return false; + protected LockStatus getLockStatus() { + return lockStatus; } // This should go away when we have the new AM and its events // and we move xlock to the lock-event-queue. public boolean isAvailable() { - return !hasExclusiveLock() && !isEmpty(); + return !lockStatus.hasExclusiveLock() && !isEmpty(); + } + + // ====================================================================== + // Functions to handle procedure queue + // ====================================================================== + public void add(final Procedure proc, final boolean addToFront) { + if (addToFront) { + runnables.addFirst(proc); + } else { + runnables.addLast(proc); + } + } + + public Procedure peek() { + return runnables.peek(); + } + + public Procedure poll() { + return runnables.poll(); + } + + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); } // ====================================================================== @@ -1070,59 +901,61 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public String toString() { return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", getClass().getSimpleName(), key, - hasExclusiveLock() ? "true (" + exclusiveLockProcIdOwner + ")" : "false", - sharedLock, size()); + lockStatus.hasExclusiveLock() ? + "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" : "false", + lockStatus.getSharedLockCount(), size()); } } + /** + * Locks on namespaces, tables, and regions. + * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these + * locks. + */ + private static class SchemaLocking { + private LockAndQueue getLock(Map map, T key) { + LockAndQueue lock = map.get(key); + if (lock == null) { + lock = new LockAndQueue(); + map.put(key, lock); + } + return lock; + } + + public LockAndQueue getTableLock(TableName tableName) { + return getLock(tableLocks, tableName); + } + + public LockAndQueue removeTableLock(TableName tableName) { + return tableLocks.remove(tableName); + } + + public LockAndQueue getNamespaceLock(String namespace) { + return getLock(namespaceLocks, namespace); + } + + public LockAndQueue getRegionLock(String encodedRegionName) { + return getLock(regionLocks, encodedRegionName); + } + + public LockAndQueue removeRegionLock(String encodedRegionName) { + return regionLocks.remove(encodedRegionName); + } + + public LockAndQueue getServerLock(ServerName serverName) { + return getLock(serverLocks, serverName); + } + + final Map serverLocks = new HashMap<>(); + final Map namespaceLocks = new HashMap<>(); + final Map tableLocks = new HashMap<>(); + // Single map for all regions irrespective of tables. Key is encoded region name. + final Map regionLocks = new HashMap<>(); + } + // ====================================================================== // Helper Data Structures // ====================================================================== - private static abstract class QueueImpl> extends Queue { - private final ArrayDeque runnables = new ArrayDeque(); - - public QueueImpl(TKey key) { - super(key); - } - - public QueueImpl(TKey key, int priority) { - super(key, priority); - } - - public void add(final Procedure proc, final boolean addToFront) { - if (addToFront) { - addFront(proc); - } else { - addBack(proc); - } - } - - protected void addFront(final Procedure proc) { - runnables.addFirst(proc); - } - - protected void addBack(final Procedure proc) { - runnables.addLast(proc); - } - - public Procedure peek() { - return runnables.peek(); - } - - @Override - public Procedure poll() { - return runnables.poll(); - } - - @Override - public boolean isEmpty() { - return runnables.isEmpty(); - } - - public int size() { - return runnables.size(); - } - } private static class FairQueue> { private final int quantum; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 4eedc9a00c2..a541b439709 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -146,7 +146,7 @@ public class TestMasterProcedureEvents { // check that nothing is in the event queue LOG.debug("checking " + event); assertEquals(false, event.isReady()); - assertEquals(0, event.size()); + assertEquals(0, event.getSuspendedProcedures().size()); // submit the procedure LOG.debug("submit " + proc); @@ -154,12 +154,12 @@ public class TestMasterProcedureEvents { // wait until the event is in the queue (proc executed and got into suspended state) LOG.debug("wait procedure suspended on " + event); - while (event.size() < 1) Thread.sleep(25); + while (event.getSuspendedProcedures().size() < 1) Thread.sleep(25); // check that the proc is in the event queue - LOG.debug("checking " + event + " size=" + event.size()); + LOG.debug("checking " + event + " size=" + event.getSuspendedProcedures().size()); assertEquals(false, event.isReady()); - assertEquals(1, event.size()); + assertEquals(1, event.getSuspendedProcedures().size()); // wake the event LOG.debug("wake " + event); @@ -172,7 +172,7 @@ public class TestMasterProcedureEvents { // check that nothing is in the event queue and the event is not suspended assertEquals(true, event.isReady()); - assertEquals(0, event.size()); + assertEquals(0, event.getSuspendedProcedures().size()); LOG.debug("completed execution of " + proc + " pollCalls=" + (procSched.getPollCalls() - startPollCalls) + " nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls));