diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index ff8d97852a6..2a678c04d2b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -232,7 +232,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { if (event.isReady()) { return false; } - suspendProcedure(event, procedure); + waitProcedure(event.getSuspendedProcedures(), procedure); return true; } } @@ -266,7 +266,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { if (isTraceEnabled) { LOG.trace("Wake event " + event); } - waitingCount += popEventWaitingObjects(event); + waitingCount += wakeWaitingProcedures(event.getSuspendedProcedures()); } } wakePollIfNeeded(waitingCount); @@ -275,21 +275,23 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } } - protected int popEventWaitingObjects(final ProcedureEvent event) { - return popEventWaitingProcedures(event); - } - - protected int popEventWaitingProcedures(final ProcedureEventQueue event) { - int count = 0; - while (event.hasWaitingProcedures()) { - wakeProcedure(event.popWaitingProcedure(false)); - count++; + /** + * Wakes up given waiting procedures by pushing them back into scheduler queues. + * @return size of given {@code waitQueue}. + */ + protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { + int count = waitQueue.size(); + // wakeProcedure adds to the front of queue, so we start from last in the + // waitQueue' queue, so that the procedure which was added first goes in the front for + // the scheduler queue. + while (!waitQueue.isEmpty()) { + wakeProcedure(waitQueue.removeLast()); } return count; } - protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) { - event.suspendProcedure(procedure); + protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { + waitQueue.addLast(proc); } protected void wakeProcedure(final Procedure procedure) { @@ -308,10 +310,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void wakePollIfNeeded(final int waitingCount) { - if (waitingCount > 1) { - schedWaitCond.signalAll(); - } else if (waitingCount > 0) { + if (waitingCount <= 0) return; + if (waitingCount == 1) { schedWaitCond.signal(); + } else { + schedWaitCond.signalAll(); } } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java new file mode 100644 index 00000000000..19ba28cefa5 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +/** + * Locking for mutual exclusion between procedures. Only by procedure framework internally. + * {@link LockAndQueue} has two purposes: + *
    + *
  1. Acquire/release exclusive/shared locks
  2. + *
  3. Maintain a list of procedures waiting for this lock
    + * To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class. Using inheritance over + * composition for this need is unusual, but the choice is motivated by million regions + * assignment case as it will reduce memory footprint and number of objects to be GCed. + *
+ * + * NOT thread-safe. Needs external concurrency control. For eg. Uses in MasterProcedureScheduler are + * guarded by schedLock(). + *
+ * There is no need of 'volatile' keyword for member variables because of memory synchronization + * guarantees of locks (see 'Memory Synchronization', + * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) + *
+ * We do not implement Lock interface because we need exclusive + shared locking, and also + * because try-lock functions require procedure id. + *
+ * We do not use ReentrantReadWriteLock directly because of its high memory overhead. + */ +public class LockAndQueue extends ProcedureDeque implements LockStatus { + private long exclusiveLockProcIdOwner = Long.MIN_VALUE; + private int sharedLock = 0; + + // ====================================================================== + // Lock Status + // ====================================================================== + + @Override + public boolean isLocked() { + return hasExclusiveLock() || sharedLock > 0; + } + + @Override + public boolean hasExclusiveLock() { + return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; + } + + @Override + public boolean isLockOwner(long procId) { + return exclusiveLockProcIdOwner == procId; + } + + @Override + public boolean hasParentLock(final Procedure proc) { + return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + } + + @Override + public boolean hasLockAccess(final Procedure proc) { + return isLockOwner(proc.getProcId()) || hasParentLock(proc); + } + + @Override + public long getExclusiveLockProcIdOwner() { + return exclusiveLockProcIdOwner; + } + + @Override + public int getSharedLockCount() { + return sharedLock; + } + + // ====================================================================== + // try/release Shared/Exclusive lock + // ====================================================================== + + public boolean trySharedLock() { + if (hasExclusiveLock()) return false; + sharedLock++; + return true; + } + + public boolean releaseSharedLock() { + return --sharedLock == 0; + } + + public boolean tryExclusiveLock(final Procedure proc) { + if (isLocked()) return hasLockAccess(proc); + exclusiveLockProcIdOwner = proc.getProcId(); + return true; + } + + public boolean releaseExclusiveLock(final Procedure proc) { + if (isLockOwner(proc.getProcId())) { + exclusiveLockProcIdOwner = Long.MIN_VALUE; + return true; + } + return false; + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java new file mode 100644 index 00000000000..9f2aae7a1e1 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +/** + * Interface to get status of a Lock without getting access to acquire/release lock. + * Currently used in MasterProcedureScheduler where we want to give Queues access to lock's + * status for scheduling purposes, but not the ability to acquire/release it. + */ +public interface LockStatus { + boolean isLocked(); + boolean hasExclusiveLock(); + boolean isLockOwner(long procId); + boolean hasParentLock(final Procedure proc); + boolean hasLockAccess(final Procedure proc); + long getExclusiveLockProcIdOwner(); + int getSharedLockCount(); +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java new file mode 100644 index 00000000000..975cbdbab85 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureDeque.java @@ -0,0 +1,34 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure2.Procedure; + +import java.util.ArrayDeque; + +/** + * Type class. + * For conceptual purpose only. Seeing ProcedureDeque as type instead of just ArrayDeque gives + * more understanding that it's a queue of waiting procedures. + */ +@InterfaceAudience.Private +public class ProcedureDeque extends ArrayDeque { +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java index 6335832508f..cb90ac0a8b2 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -22,34 +22,32 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; /** - * Basic ProcedureEvent that contains an "object", which can be a - * description or a reference to the resource to wait on, and a - * queue for suspended procedures. + * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the + * resource to wait on, and a queue for suspended procedures. + * Access to suspended procedures queue is 'synchronized' on the event itself. */ @InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureEvent extends ProcedureEventQueue { +public class ProcedureEvent { private final T object; - private boolean ready = false; + private ProcedureDeque suspendedProcedures = new ProcedureDeque(); public ProcedureEvent(final T object) { this.object = object; } - public T getObject() { - return object; - } - public synchronized boolean isReady() { return ready; } - @InterfaceAudience.Private - protected synchronized void setReady(final boolean isReady) { + synchronized void setReady(final boolean isReady) { this.ready = isReady; } + public ProcedureDeque getSuspendedProcedures() { + return suspendedProcedures; + } + @Override public String toString() { return getClass().getSimpleName() + "(" + object + ")"; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java deleted file mode 100644 index a109e9e639d..00000000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEventQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import com.google.common.annotations.VisibleForTesting; - -import java.util.ArrayDeque; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * Basic queue to store suspended procedures. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureEventQueue { - private static final Log LOG = LogFactory.getLog(ProcedureEventQueue.class); - - private ArrayDeque waitingProcedures = null; - - public ProcedureEventQueue() { - } - - @InterfaceAudience.Private - public synchronized void suspendProcedure(final Procedure proc) { - if (waitingProcedures == null) { - waitingProcedures = new ArrayDeque(); - } - waitingProcedures.addLast(proc); - } - - @InterfaceAudience.Private - public synchronized void removeProcedure(final Procedure proc) { - if (waitingProcedures != null) { - waitingProcedures.remove(proc); - } - } - - @InterfaceAudience.Private - public synchronized boolean hasWaitingProcedures() { - return waitingProcedures != null; - } - - @InterfaceAudience.Private - public synchronized Procedure popWaitingProcedure(final boolean popFront) { - // it will be nice to use IterableList on a procedure and avoid allocations... - Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); - if (waitingProcedures.isEmpty()) { - waitingProcedures = null; - } - return proc; - } - - @VisibleForTesting - public synchronized void clear() { - waitingProcedures = null; - } - - @VisibleForTesting - public synchronized int size() { - if (waitingProcedures != null) { - return waitingProcedures.size(); - } - return 0; - } -} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index ffc8273a99b..1a6775ae15d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.procedure2; -import java.util.ArrayDeque; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -29,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class SimpleProcedureScheduler extends AbstractProcedureScheduler { - private final ArrayDeque runnables = new ArrayDeque(); + private final ProcedureDeque runnables = new ProcedureDeque(); @Override protected void enqueue(final Procedure procedure, final boolean addFront) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index 20fc4922842..c4b49f0bd5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -42,6 +42,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +/** + * Procedure to allow clients and external admin tools to take locks on table/namespace/regions. + * This procedure when scheduled, acquires specified locks, suspends itself and waits for : + * - call to unlock: if lock request came from the process itself, say master chore. + * - Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding + * the lock or not based on last heartbeat timestamp. + */ @InterfaceAudience.Private public final class LockProcedure extends Procedure implements TableProcedureInterface { @@ -71,7 +78,7 @@ public final class LockProcedure extends Procedure // this is for internal working private boolean hasLock; - private final ProcedureEvent event = new ProcedureEvent(this); + private final ProcedureEvent event = new ProcedureEvent<>(this); // True if this proc acquired relevant locks. This value is for client checks. private final AtomicBoolean locked = new AtomicBoolean(false); // Last system time (in ms) when client sent the heartbeat. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 87c79b69abe..2cd5b0839c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -141,14 +141,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); } - public void wake(ProcedureEvent event) { - procSched.wakeEvent(event); - } - - public void suspend(ProcedureEvent event) { - procSched.suspendEvent(event); - } - public void setEventReady(ProcedureEvent event, boolean isReady) { if (isReady) { procSched.wakeEvent(event); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 8ec53a8504a..a3adf025414 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.master.procedure; import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.LockStatus; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEventQueue; +import org.apache.hadoop.hbase.procedure2.LockAndQueue; +import org.apache.hadoop.hbase.procedure2.ProcedureDeque; import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; @@ -58,35 +60,87 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; * each procedure will be pushed in its own queue, and based on the operation type * we may take smarter decision. e.g. we can abort all the operations preceding * a delete table, or similar. + * + *

Concurrency control

+ * Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap, + * serverBuckets) is controlled by schedLock(). That mainly includes:
+ *
    + *
  • + * {@link #push(Procedure, boolean, boolean)} : A push will add a Queue back to run-queue + * when: + *
      + *
    1. queue was empty before push (so must have been out of run-queue)
    2. + *
    3. child procedure is added (which means parent procedure holds exclusive lock, and it + * must have moved Queue out of run-queue)
    4. + *
    + *
  • + *
  • + * {@link #poll(long)} : A poll will remove a Queue from run-queue when: + *
      + *
    1. queue becomes empty after poll
    2. + *
    3. exclusive lock is requested by polled procedure and lock is available (returns the + * procedure)
    4. + *
    5. exclusive lock is requested but lock is not available (returns null)
    6. + *
    7. Polled procedure is child of parent holding exclusive lock, and the next procedure is + * not a child
    8. + *
    + *
  • + *
  • + * namespace/table/region locks: Queue is added back to run-queue when lock being released is: + *
      + *
    1. exclusive lock
    2. + *
    3. last shared lock (in case queue was removed because next procedure in queue required + * exclusive lock)
    4. + *
    + *
  • + *
*/ @InterfaceAudience.Private @InterfaceStability.Evolving public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); - private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = - new NamespaceQueueKeyComparator(); private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR = new ServerQueueKeyComparator(); private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = new TableQueueKeyComparator(); - private final FairQueue serverRunQueue = new FairQueue(); - private final FairQueue tableRunQueue = new FairQueue(); + private final FairQueue serverRunQueue = new FairQueue<>(); + private final FairQueue tableRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; - private NamespaceQueue namespaceMap = null; private TableQueue tableMap = null; + private final SchemaLocking locking = new SchemaLocking(); - private final int metaTablePriority; - private final int userTablePriority; - private final int sysTablePriority; + /** + * Table priority is used when scheduling procedures from {@link #tableRunQueue}. A TableQueue + * with priority 2 will get its procedures scheduled at twice the rate as compared to + * TableQueue with priority 1. + */ + private static class TablePriorities { + TablePriorities(Configuration conf) { + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + final int metaTablePriority; + final int userTablePriority; + final int sysTablePriority; + + int getPriority(TableName tableName) { + if (tableName.equals(TableName.META_TABLE_NAME)) { + return metaTablePriority; + } else if (tableName.isSystemTable()) { + return sysTablePriority; + } + return userTablePriority; + } + } + private final TablePriorities tablePriorities; public MasterProcedureScheduler(final Configuration conf) { - // TODO: should this be part of the HTD? - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + tablePriorities = new TablePriorities(conf); } @Override @@ -113,11 +167,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private > void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { queue.add(proc, addFront); - if (!queue.hasExclusiveLock() || queue.isLockOwner(proc.getProcId())) { + if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) { // if the queue was not remove for an xlock execution // or the proc is the lock owner, put the queue back into execution addToRunQueue(fairq, queue); - } else if (queue.hasParentLock(proc)) { + } else if (queue.getLockStatus().hasParentLock(proc)) { assert addFront : "expected to add a child in the front"; // our (proc) parent has the xlock, // so the queue is not in the fairq (run-queue) @@ -151,7 +205,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final Procedure pollResult = rq.peek(); final boolean xlockReq = rq.requireExclusiveLock(pollResult); - if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) { + if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) { // someone is already holding the lock (e.g. shared lock). avoid a yield removeFromRunQueue(fairq, rq); return null; @@ -160,7 +214,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { rq.poll(); if (rq.isEmpty() || xlockReq) { removeFromRunQueue(fairq, rq); - } else if (rq.hasParentLock(pollResult)) { + } else if (rq.getLockStatus().hasParentLock(pollResult)) { // if the rq is in the fairq because of runnable child // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state @@ -181,10 +235,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { serverBuckets[i] = null; } - // Remove Namespaces - clear(namespaceMap, null, NAMESPACE_QUEUE_KEY_COMPARATOR); - namespaceMap = null; - // Remove Tables clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); tableMap = null; @@ -206,7 +256,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { int count = 0; // Server queues - final AvlTreeIterator serverIter = new AvlTreeIterator(); + final AvlTreeIterator serverIter = new AvlTreeIterator<>(); for (int i = 0; i < serverBuckets.length; ++i) { serverIter.seekFirst(serverBuckets[i]); while (serverIter.hasNext()) { @@ -215,7 +265,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } // Table queues - final AvlTreeIterator tableIter = new AvlTreeIterator(tableMap); + final AvlTreeIterator tableIter = new AvlTreeIterator<>(tableMap); while (tableIter.hasNext()) { count += tableIter.next().size(); } @@ -250,13 +300,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private > void addToRunQueue(FairQueue fairq, Queue queue) { + private static > void addToRunQueue(FairQueue fairq, Queue queue) { if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { fairq.add(queue); } } - private > void removeFromRunQueue(FairQueue fairq, Queue queue) { + private static > void removeFromRunQueue( + FairQueue fairq, Queue queue) { if (AvlIterableList.isLinked(queue)) { fairq.remove(queue); } @@ -265,37 +316,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Table Queue Lookup Helpers // ============================================================================ - private TableQueue getTableQueueWithLock(TableName tableName) { - schedLock(); - try { - return getTableQueue(tableName); - } finally { - schedUnlock(); - } - } - private TableQueue getTableQueue(TableName tableName) { TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); if (node != null) return node; - NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString()); - node = new TableQueue(tableName, nsQueue, getTablePriority(tableName)); + node = new TableQueue(tableName, tablePriorities.getPriority(tableName), + locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); tableMap = AvlTree.insert(tableMap, node); return node; } private void removeTableQueue(TableName tableName) { tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); + locking.removeTableLock(tableName); } - private int getTablePriority(TableName tableName) { - if (tableName.equals(TableName.META_TABLE_NAME)) { - return metaTablePriority; - } else if (tableName.isSystemTable()) { - return sysTablePriority; - } - return userTablePriority; - } private static boolean isTableProcedure(Procedure proc) { return proc instanceof TableProcedureInterface; @@ -305,44 +340,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return ((TableProcedureInterface)proc).getTableName(); } - // ============================================================================ - // Namespace Queue Lookup Helpers - // ============================================================================ - private NamespaceQueue getNamespaceQueue(String namespace) { - NamespaceQueue node = AvlTree.get(namespaceMap, namespace, NAMESPACE_QUEUE_KEY_COMPARATOR); - if (node != null) return (NamespaceQueue)node; - - node = new NamespaceQueue(namespace); - namespaceMap = AvlTree.insert(namespaceMap, node); - return node; - } - // ============================================================================ // Server Queue Lookup Helpers // ============================================================================ - private ServerQueue getServerQueueWithLock(ServerName serverName) { - schedLock(); - try { - return getServerQueue(serverName); - } finally { - schedUnlock(); - } - } - private ServerQueue getServerQueue(ServerName serverName) { final int index = getBucketIndex(serverBuckets, serverName.hashCode()); ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); if (node != null) return node; - node = new ServerQueue(serverName); + node = new ServerQueue(serverName, locking.getServerLock(serverName)); serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); - return (ServerQueue)node; - } - - private void removeServerQueue(ServerName serverName) { - final int index = getBucketIndex(serverBuckets, serverName.hashCode()); - final ServerQueue root = serverBuckets[index]; - serverBuckets[index] = AvlTree.remove(root, serverName, SERVER_QUEUE_KEY_COMPARATOR); + return node; } private static int getBucketIndex(Object[] buckets, int hashCode) { @@ -367,9 +375,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - public static class ServerQueue extends QueueImpl { - public ServerQueue(ServerName serverName) { - super(serverName); + public static class ServerQueue extends Queue { + public ServerQueue(ServerName serverName, LockStatus serverLock) { + super(serverName, serverLock); } public boolean requireExclusiveLock(Procedure proc) { @@ -384,55 +392,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private static class RegionEvent extends ProcedureEventQueue { - private final HRegionInfo regionInfo; - private long exclusiveLockProcIdOwner = Long.MIN_VALUE; - - public RegionEvent(HRegionInfo regionInfo) { - this.regionInfo = regionInfo; - } - - public boolean hasExclusiveLock() { - return exclusiveLockProcIdOwner != Long.MIN_VALUE; - } - - public boolean isLockOwner(long procId) { - return exclusiveLockProcIdOwner == procId; - } - - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && - (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); - } - - public boolean hasLockAccess(final Procedure proc) { - return isLockOwner(proc.getProcId()) || hasParentLock(proc); - } - - public boolean tryExclusiveLock(final Procedure proc) { - if (hasExclusiveLock()) return hasLockAccess(proc); - exclusiveLockProcIdOwner = proc.getProcId(); - return true; - } - - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockProcIdOwner = Long.MIN_VALUE; - return true; - } - return false; - } - - public HRegionInfo getRegionInfo() { - return regionInfo; - } - - @Override - public String toString() { - return "RegionEvent(" + regionInfo.getRegionNameAsString() + ")"; - } - } - private static class TableQueueKeyComparator implements AvlKeyComparator { @Override public int compareKey(TableQueue node, Object key) { @@ -440,124 +399,39 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - public static class TableQueue extends QueueImpl { - private final NamespaceQueue namespaceQueue; + public static class TableQueue extends Queue { + private final LockStatus namespaceLockStatus; - private HashMap regionEventMap; - - public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { - super(tableName, priority); - this.namespaceQueue = namespaceQueue; - } - - public NamespaceQueue getNamespaceQueue() { - return namespaceQueue; + public TableQueue(TableName tableName, int priority, LockStatus tableLock, + LockStatus namespaceLockStatus) { + super(tableName, priority, tableLock); + this.namespaceLockStatus = namespaceLockStatus; } @Override public boolean isAvailable() { // if there are no items in the queue, or the namespace is locked. // we can't execute operation on this table - if (isEmpty() || namespaceQueue.hasExclusiveLock()) { + if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) { return false; } - if (hasExclusiveLock()) { + if (getLockStatus().hasExclusiveLock()) { // if we have an exclusive lock already taken // only child of the lock owner can be executed final Procedure nextProc = peek(); - return nextProc != null && hasLockAccess(nextProc); + return nextProc != null && getLockStatus().hasLockAccess(nextProc); } // no xlock return true; } - public RegionEvent getRegionEvent(final HRegionInfo regionInfo) { - if (regionEventMap == null) { - regionEventMap = new HashMap(); - } - RegionEvent event = regionEventMap.get(regionInfo.getEncodedName()); - if (event == null) { - event = new RegionEvent(regionInfo); - regionEventMap.put(regionInfo.getEncodedName(), event); - } - return event; - } - - public void removeRegionEvent(final RegionEvent event) { - regionEventMap.remove(event.getRegionInfo().getEncodedName()); - if (regionEventMap.isEmpty()) { - regionEventMap = null; - } - } - - // TODO: We can abort pending/in-progress operation if the new call is - // something like drop table. We can Override addBack(), - // check the type and abort all the in-flight procedurs. - private boolean canAbortPendingOperations(Procedure proc) { - TableProcedureInterface tpi = (TableProcedureInterface)proc; - switch (tpi.getTableOperationType()) { - case DELETE: - return true; - default: - return false; - } - } - public boolean requireExclusiveLock(Procedure proc) { return requireTableExclusiveLock((TableProcedureInterface)proc); } } - private static class NamespaceQueueKeyComparator implements AvlKeyComparator { - @Override - public int compareKey(NamespaceQueue node, Object key) { - return node.compareKey((String)key); - } - } - - /** - * the namespace is currently used just as a rwlock, not as a queue. - * because ns operation are not frequent enough. so we want to avoid - * having to move table queues around for suspend/resume. - */ - private static class NamespaceQueue extends Queue { - public NamespaceQueue(String namespace) { - super(namespace); - } - - @Override - public boolean requireExclusiveLock(Procedure proc) { - throw new UnsupportedOperationException(); - } - - @Override - public void add(final Procedure proc, final boolean addToFront) { - throw new UnsupportedOperationException(); - } - - @Override - public Procedure peek() { - throw new UnsupportedOperationException(); - } - - @Override - public Procedure poll() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException(); - } - - @Override - public int size() { - throw new UnsupportedOperationException(); - } - } - // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -598,18 +472,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (!nsQueue.trySharedLock()) { - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); + if (!namespaceLock.trySharedLock()) { + waitProcedure(namespaceLock, procedure); return true; } - if (!tableQueue.tryExclusiveLock(procedure)) { - nsQueue.releaseSharedLock(); - suspendProcedure(tableQueue.getEvent(), procedure); + if (!tableLock.tryExclusiveLock(procedure)) { + namespaceLock.releaseSharedLock(); + waitProcedure(tableLock, procedure); return true; } - removeFromRunQueue(tableRunQueue, tableQueue); + removeFromRunQueue(tableRunQueue, getTableQueue(table)); return false; } finally { schedUnlock(); @@ -624,18 +498,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - if (!tableQueue.hasParentLock(procedure)) { - tableQueue.releaseExclusiveLock(procedure); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if (!tableLock.hasParentLock(procedure)) { + tableLock.releaseExclusiveLock(procedure); + waitingCount += wakeWaitingProcedures(tableLock); } - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (nsQueue.releaseSharedLock()) { - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + if (namespaceLock.releaseSharedLock()) { + waitingCount += wakeWaitingProcedures(namespaceLock); } - addToRunQueue(tableRunQueue, tableQueue); + addToRunQueue(tableRunQueue, getTableQueue(table)); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -656,20 +530,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); - if (!nsQueue.trySharedLock()) { - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); + if (!namespaceLock.trySharedLock()) { + waitProcedure(namespaceLock, procedure); return null; } - if (!tableQueue.trySharedLock()) { - tableQueue.getNamespaceQueue().releaseSharedLock(); - suspendProcedure(tableQueue.getEvent(), procedure); + if (!tableLock.trySharedLock()) { + namespaceLock.releaseSharedLock(); + waitProcedure(tableLock, procedure); return null; } - return tableQueue; + return getTableQueue(table); } finally { schedUnlock(); } @@ -683,15 +557,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeTableSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(table); - final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue(); + final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); + final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - if (tableQueue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, tableQueue); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if (tableLock.releaseSharedLock()) { + addToRunQueue(tableRunQueue, getTableQueue(table)); + waitingCount += wakeWaitingProcedures(tableLock); } - if (nsQueue.releaseSharedLock()) { - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + if (namespaceLock.releaseSharedLock()) { + waitingCount += wakeWaitingProcedures(namespaceLock); } wakePollIfNeeded(waitingCount); } finally { @@ -712,10 +586,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { schedLock(); try { - TableQueue queue = getTableQueue(table); + final TableQueue queue = getTableQueue(table); + final LockAndQueue tableLock = locking.getTableLock(table); if (queue == null) return true; - if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) { + if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { // remove the table from the run-queue and the map if (AvlIterableList.isLinked(queue)) { tableRunQueue.remove(queue); @@ -754,41 +629,40 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitRegions(final Procedure procedure, final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); + schedLock(); + try { + // If there is parent procedure, it would have already taken xlock, so no need to take + // shared lock here. Otherwise, take shared lock. + if (!procedure.hasParent() + && waitTableQueueSharedLock(procedure, table) == null) { + return true; + } - final TableQueue queue; - if (procedure.hasParent()) { - // the assumption is that the parent procedure have already the table xlock - queue = getTableQueueWithLock(table); - } else { - // acquire the table shared-lock - queue = waitTableQueueSharedLock(procedure, table); - if (queue == null) return true; - } - - // acquire region xlocks or wait - boolean hasLock = true; - final RegionEvent[] event = new RegionEvent[regionInfo.length]; - synchronized (queue) { + // acquire region xlocks or wait + boolean hasLock = true; + final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); - assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; - event[i] = queue.getRegionEvent(regionInfo[i]); - if (!event[i].tryExclusiveLock(procedure)) { - suspendProcedure(event[i], procedure); + regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); + if (!regionLocks[i].tryExclusiveLock(procedure)) { + waitProcedure(regionLocks[i], procedure); hasLock = false; while (i-- > 0) { - event[i].releaseExclusiveLock(procedure); + regionLocks[i].releaseExclusiveLock(procedure); } break; } } - } - if (!hasLock && !procedure.hasParent()) { - wakeTableSharedLock(procedure, table); + if (!hasLock && !procedure.hasParent()) { + wakeTableSharedLock(procedure, table); + } + return !hasLock; + } finally { + schedUnlock(); } - return !hasLock; } /** @@ -808,32 +682,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeRegions(final Procedure procedure,final TableName table, final HRegionInfo... regionInfo) { Arrays.sort(regionInfo); - - final TableQueue queue = getTableQueueWithLock(table); - - int numProcs = 0; - final Procedure[] nextProcs = new Procedure[regionInfo.length]; - synchronized (queue) { - HRegionInfo prevRegion = null; + schedLock(); + try { + int numProcs = 0; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); - assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i]; + assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; - RegionEvent event = queue.getRegionEvent(regionInfo[i]); - if (event.releaseExclusiveLock(procedure)) { - if (event.hasWaitingProcedures()) { + LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); + if (regionLock.releaseExclusiveLock(procedure)) { + if (!regionLock.isEmpty()) { // release one procedure at the time since regions has an xlock - nextProcs[numProcs++] = event.popWaitingProcedure(true); + nextProcs[numProcs++] = regionLock.removeFirst(); } else { - queue.removeRegionEvent(event); + locking.removeRegionLock(regionInfo[i].getEncodedName()); } } } - } - // awake procedures if any - schedLock(); - try { + // awake procedures if any for (int i = numProcs - 1; i >= 0; --i) { wakeProcedure(nextProcs[i]); } @@ -855,22 +723,23 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * Suspend the procedure if the specified namespace is already locked. * @see #wakeNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure trying to acquire the lock - * @param nsName Namespace to lock + * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - if (!tableQueue.trySharedLock()) { - suspendProcedure(tableQueue.getEvent(), procedure); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + if (!systemNamespaceTableLock.trySharedLock()) { + waitProcedure(systemNamespaceTableLock, procedure); return true; } - final NamespaceQueue nsQueue = getNamespaceQueue(nsName); - if (!nsQueue.tryExclusiveLock(procedure)) { - tableQueue.releaseSharedLock(); - suspendProcedure(nsQueue.getEvent(), procedure); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + if (!namespaceLock.tryExclusiveLock(procedure)) { + systemNamespaceTableLock.releaseSharedLock(); + waitProcedure(namespaceLock, procedure); return true; } return false; @@ -883,20 +752,21 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * Wake the procedures waiting for the specified namespace * @see #waitNamespaceExclusiveLock(Procedure,String) * @param procedure the procedure releasing the lock - * @param nsName the namespace that has the exclusive lock + * @param namespace the namespace that has the exclusive lock */ - public void wakeNamespaceExclusiveLock(final Procedure procedure, final String nsName) { + public void wakeNamespaceExclusiveLock(final Procedure procedure, final String namespace) { schedLock(); try { - final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); - final NamespaceQueue nsQueue = getNamespaceQueue(nsName); + final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); + final LockAndQueue systemNamespaceTableLock = + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + namespaceLock.releaseExclusiveLock(procedure); int waitingCount = 0; - nsQueue.releaseExclusiveLock(procedure); - if (tableQueue.releaseSharedLock()) { - addToRunQueue(tableRunQueue, tableQueue); - waitingCount += popEventWaitingProcedures(tableQueue.getEvent()); + if(systemNamespaceTableLock.releaseSharedLock()) { + addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME)); + waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); } - waitingCount += popEventWaitingProcedures(nsQueue.getEvent()); + waitingCount += wakeWaitingProcedures(namespaceLock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -916,12 +786,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { - ServerQueue queue = getServerQueue(serverName); - if (queue.tryExclusiveLock(procedure)) { - removeFromRunQueue(serverRunQueue, queue); + final LockAndQueue lock = locking.getServerLock(serverName); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(serverRunQueue, getServerQueue(serverName)); return false; } - suspendProcedure(queue.getEvent(), procedure); + waitProcedure(lock, procedure); return true; } finally { schedUnlock(); @@ -937,10 +807,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { - final ServerQueue queue = getServerQueue(serverName); - queue.releaseExclusiveLock(procedure); - addToRunQueue(serverRunQueue, queue); - int waitingCount = popEventWaitingProcedures(queue.getEvent()); + final LockAndQueue lock = locking.getServerLock(serverName); + lock.releaseExclusiveLock(procedure); + addToRunQueue(serverRunQueue, getServerQueue(serverName)); + int waitingCount = wakeWaitingProcedures(lock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -950,38 +820,24 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Generic Helpers // ============================================================================ - private static interface QueueInterface { - boolean isAvailable(); - boolean isEmpty(); - int size(); - - void add(Procedure proc, boolean addFront); - boolean requireExclusiveLock(Procedure proc); - Procedure peek(); - Procedure poll(); - } - - // TODO Why OK not having synchronized access and/or volatiles and - // sharedLock-- and sharedLock++? Is this accessed by one thread only? - // Write up the concurrency expectations. St.Ack 01/19/2017 private static abstract class Queue> - extends AvlLinkedNode> implements QueueInterface { - private final ProcedureEventQueue event; - - private long exclusiveLockProcIdOwner = Long.MIN_VALUE; - private int sharedLock = 0; + extends AvlLinkedNode> { + abstract boolean requireExclusiveLock(Procedure proc); private final TKey key; private final int priority; + private final ProcedureDeque runnables = new ProcedureDeque(); + // Reference to status of lock on entity this queue represents. + private final LockStatus lockStatus; - public Queue(TKey key) { - this(key, 1); + public Queue(TKey key, LockStatus lockStatus) { + this(key, 1, lockStatus); } - public Queue(TKey key, int priority) { + public Queue(TKey key, int priority, LockStatus lockStatus) { this.key = key; this.priority = priority; - this.event = new ProcedureEventQueue(); + this.lockStatus = lockStatus; } protected TKey getKey() { @@ -992,66 +848,41 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return priority; } - public ProcedureEventQueue getEvent() { - return event; - } - - // ====================================================================== - // Read/Write Locking helpers - // ====================================================================== - public boolean isLocked() { - return hasExclusiveLock() || sharedLock > 0; - } - - public boolean hasExclusiveLock() { - return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; - } - - public boolean trySharedLock() { - if (hasExclusiveLock()) return false; - sharedLock++; - return true; - } - - public boolean releaseSharedLock() { - return --sharedLock == 0; - } - - protected boolean isSingleSharedLock() { - return sharedLock == 1; - } - - public boolean isLockOwner(long procId) { - return exclusiveLockProcIdOwner == procId; - } - - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && - (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); - } - - public boolean hasLockAccess(final Procedure proc) { - return isLockOwner(proc.getProcId()) || hasParentLock(proc); - } - - public boolean tryExclusiveLock(final Procedure proc) { - if (isLocked()) return hasLockAccess(proc); - exclusiveLockProcIdOwner = proc.getProcId(); - return true; - } - - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockProcIdOwner = Long.MIN_VALUE; - return true; - } - return false; + protected LockStatus getLockStatus() { + return lockStatus; } // This should go away when we have the new AM and its events // and we move xlock to the lock-event-queue. public boolean isAvailable() { - return !hasExclusiveLock() && !isEmpty(); + return !lockStatus.hasExclusiveLock() && !isEmpty(); + } + + // ====================================================================== + // Functions to handle procedure queue + // ====================================================================== + public void add(final Procedure proc, final boolean addToFront) { + if (addToFront) { + runnables.addFirst(proc); + } else { + runnables.addLast(proc); + } + } + + public Procedure peek() { + return runnables.peek(); + } + + public Procedure poll() { + return runnables.poll(); + } + + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); } // ====================================================================== @@ -1070,59 +901,61 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { public String toString() { return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", getClass().getSimpleName(), key, - hasExclusiveLock() ? "true (" + exclusiveLockProcIdOwner + ")" : "false", - sharedLock, size()); + lockStatus.hasExclusiveLock() ? + "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" : "false", + lockStatus.getSharedLockCount(), size()); } } + /** + * Locks on namespaces, tables, and regions. + * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these + * locks. + */ + private static class SchemaLocking { + private LockAndQueue getLock(Map map, T key) { + LockAndQueue lock = map.get(key); + if (lock == null) { + lock = new LockAndQueue(); + map.put(key, lock); + } + return lock; + } + + public LockAndQueue getTableLock(TableName tableName) { + return getLock(tableLocks, tableName); + } + + public LockAndQueue removeTableLock(TableName tableName) { + return tableLocks.remove(tableName); + } + + public LockAndQueue getNamespaceLock(String namespace) { + return getLock(namespaceLocks, namespace); + } + + public LockAndQueue getRegionLock(String encodedRegionName) { + return getLock(regionLocks, encodedRegionName); + } + + public LockAndQueue removeRegionLock(String encodedRegionName) { + return regionLocks.remove(encodedRegionName); + } + + public LockAndQueue getServerLock(ServerName serverName) { + return getLock(serverLocks, serverName); + } + + final Map serverLocks = new HashMap<>(); + final Map namespaceLocks = new HashMap<>(); + final Map tableLocks = new HashMap<>(); + // Single map for all regions irrespective of tables. Key is encoded region name. + final Map regionLocks = new HashMap<>(); + } + // ====================================================================== // Helper Data Structures // ====================================================================== - private static abstract class QueueImpl> extends Queue { - private final ArrayDeque runnables = new ArrayDeque(); - - public QueueImpl(TKey key) { - super(key); - } - - public QueueImpl(TKey key, int priority) { - super(key, priority); - } - - public void add(final Procedure proc, final boolean addToFront) { - if (addToFront) { - addFront(proc); - } else { - addBack(proc); - } - } - - protected void addFront(final Procedure proc) { - runnables.addFirst(proc); - } - - protected void addBack(final Procedure proc) { - runnables.addLast(proc); - } - - public Procedure peek() { - return runnables.peek(); - } - - @Override - public Procedure poll() { - return runnables.poll(); - } - - @Override - public boolean isEmpty() { - return runnables.isEmpty(); - } - - public int size() { - return runnables.size(); - } - } private static class FairQueue> { private final int quantum; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java index 4eedc9a00c2..a541b439709 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -146,7 +146,7 @@ public class TestMasterProcedureEvents { // check that nothing is in the event queue LOG.debug("checking " + event); assertEquals(false, event.isReady()); - assertEquals(0, event.size()); + assertEquals(0, event.getSuspendedProcedures().size()); // submit the procedure LOG.debug("submit " + proc); @@ -154,12 +154,12 @@ public class TestMasterProcedureEvents { // wait until the event is in the queue (proc executed and got into suspended state) LOG.debug("wait procedure suspended on " + event); - while (event.size() < 1) Thread.sleep(25); + while (event.getSuspendedProcedures().size() < 1) Thread.sleep(25); // check that the proc is in the event queue - LOG.debug("checking " + event + " size=" + event.size()); + LOG.debug("checking " + event + " size=" + event.getSuspendedProcedures().size()); assertEquals(false, event.isReady()); - assertEquals(1, event.size()); + assertEquals(1, event.getSuspendedProcedures().size()); // wake the event LOG.debug("wake " + event); @@ -172,7 +172,7 @@ public class TestMasterProcedureEvents { // check that nothing is in the event queue and the event is not suspended assertEquals(true, event.isReady()); - assertEquals(0, event.size()); + assertEquals(0, event.getSuspendedProcedures().size()); LOG.debug("completed execution of " + proc + " pollCalls=" + (procSched.getPollCalls() - startPollCalls) + " nullPollCalls=" + (procSched.getNullPollCalls() - startNullPollCalls));