diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 7e584202e6b..ee61841d025 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -79,6 +79,9 @@ public abstract class Procedure implements Comparable { private int childrenLatch = 0; private long lastUpdate; + // TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks + private boolean suspended = false; + private RemoteProcedureException exception = null; private byte[] result = null; @@ -94,7 +97,7 @@ public abstract class Procedure implements Comparable { * @throws InterruptedException the procedure will be added back to the queue and retried later */ protected abstract Procedure[] execute(TEnvironment env) - throws ProcedureYieldException, InterruptedException; + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException; /** * The code to undo what done by the execute() code. @@ -276,6 +279,9 @@ public abstract class Procedure implements Comparable { */ protected void toStringState(StringBuilder builder) { builder.append(getState()); + if (isSuspended()) { + builder.append("|SUSPENDED"); + } } /** @@ -319,7 +325,7 @@ public abstract class Procedure implements Comparable { } public long getParentProcId() { - return parentProcId; + return parentProcId.longValue(); } public NonceKey getNonceKey() { @@ -371,6 +377,23 @@ public abstract class Procedure implements Comparable { return false; } + /** + * @return true if the procedure is in a suspended state, + * waiting for the resources required to execute the procedure will become available. + */ + public synchronized boolean isSuspended() { + return suspended; + } + + public synchronized void suspend() { + suspended = true; + } + + public synchronized void resume() { + assert isSuspended() : this + " expected suspended state, got " + state; + suspended = false; + } + public synchronized RemoteProcedureException getException() { return exception; } @@ -398,7 +421,7 @@ public abstract class Procedure implements Comparable { * @return the timeout in msec */ public int getTimeout() { - return timeout; + return timeout.intValue(); } /** @@ -494,7 +517,7 @@ public abstract class Procedure implements Comparable { */ @InterfaceAudience.Private protected Procedure[] doExecute(final TEnvironment env) - throws ProcedureYieldException, InterruptedException { + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { try { updateTimestamp(); return execute(env); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index f43b65f407b..9d71f6561af 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -505,15 +505,25 @@ public class ProcedureExecutor { } }; + long st, et; + // Acquire the store lease. + st = EnvironmentEdgeManager.currentTime(); store.recoverLease(); + et = EnvironmentEdgeManager.currentTime(); + LOG.info(String.format("recover procedure store (%s) lease: %s", + store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // TODO: Split in two steps. // TODO: Handle corrupted procedures (currently just a warn) // The first one will make sure that we have the latest id, // so we can start the threads and accept new procedures. // The second step will do the actual load of old procedures. + st = EnvironmentEdgeManager.currentTime(); load(abortOnCorruption); + et = EnvironmentEdgeManager.currentTime(); + LOG.info(String.format("load procedure store (%s): %s", + store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // Start the executors. Here we must have the lastProcId set. for (int i = 0; i < threads.length; ++i) { @@ -840,7 +850,7 @@ public class ProcedureExecutor { } // Execute the procedure - assert proc.getState() == ProcedureState.RUNNABLE; + assert proc.getState() == ProcedureState.RUNNABLE : proc; if (proc.acquireLock(getEnvironment())) { execProcedure(procStack, proc); proc.releaseLock(getEnvironment()); @@ -1042,6 +1052,7 @@ public class ProcedureExecutor { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE); // Execute the procedure + boolean isSuspended = false; boolean reExecute = false; Procedure[] subprocs = null; do { @@ -1051,6 +1062,8 @@ public class ProcedureExecutor { if (subprocs != null && subprocs.length == 0) { subprocs = null; } + } catch (ProcedureSuspendedException e) { + isSuspended = true; } catch (ProcedureYieldException e) { if (LOG.isTraceEnabled()) { LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage()); @@ -1086,7 +1099,7 @@ public class ProcedureExecutor { break; } - assert subproc.getState() == ProcedureState.INITIALIZING; + assert subproc.getState() == ProcedureState.INITIALIZING : subproc; subproc.setParentProcId(procedure.getProcId()); subproc.setProcId(nextProcId()); } @@ -1107,7 +1120,7 @@ public class ProcedureExecutor { } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { waitingTimeout.add(procedure); - } else { + } else if (!isSuspended) { // No subtask, so we are done procedure.setState(ProcedureState.FINISHED); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java new file mode 100644 index 00000000000..f28d57adc7b --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ProcedureSuspendedException extends ProcedureException { + /** default constructor */ + public ProcedureSuspendedException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ProcedureSuspendedException(String s) { + super(s); + } +} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 636a0377965..f0bcdea9094 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -42,7 +42,7 @@ public abstract class SequentialProcedure extends Procedure> void doAdd(final FairQueue fairq, final Queue queue, final Procedure proc, final boolean addFront) { + if (proc.isSuspended()) return; + queue.add(proc, addFront); + if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + // the queue is not suspended or removed from the fairq (run-queue) + // because someone has an xlock on it. + // so, if the queue is not-linked we should add it if (queue.size() == 1 && !IterableList.isLinked(queue)) { fairq.add(queue); } queueSize++; + } else if (proc.hasParent() && queue.isLockOwner(proc.getParentProcId())) { + assert addFront : "expected to add a child in the front"; + assert !queue.isSuspended() : "unexpected suspended state for the queue"; + // our (proc) parent has the xlock, + // so the queue is not in the fairq (run-queue) + // add it back to let the child run (inherit the lock) + if (!IterableList.isLinked(queue)) { + fairq.add(queue); + } + queueSize++; } } @@ -140,7 +165,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - Procedure poll(long waitNsec) { + protected Procedure poll(long waitNsec) { Procedure pollResult = null; schedLock.lock(); try { @@ -185,7 +210,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { this.queueSize--; if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { removeFromRunQueue(fairq, rq); + } else if (pollResult.hasParent() && rq.isLockOwner(pollResult.getParentProcId())) { + // if the rq is in the fairq because of runnable child + // check if the next procedure is still a child. + // if not, remove the rq from the fairq and go back to the xlock state + Procedure nextProc = rq.peek(); + if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) { + removeFromRunQueue(fairq, rq); + } } + return pollResult; } @@ -300,18 +334,25 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); + } + + private boolean waitEvent(ProcedureEvent event, boolean lockEvent, + Procedure procedure, boolean suspendQueue) { synchronized (event) { if (event.isReady()) { + if (lockEvent) { + event.setReady(false); + } return false; } - // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue - if (!suspendQueue) suspendQueue = true; - - if (isTableProcedure(procedure)) { - waitTableEvent(event, procedure, suspendQueue); + if (!suspendQueue) { + suspendProcedure(event, procedure); + } else if (isTableProcedure(procedure)) { + waitTableEvent(event, procedure); } else if (isServerProcedure(procedure)) { - waitServerEvent(event, procedure, suspendQueue); + waitServerEvent(event, procedure); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -324,17 +365,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return true; } - private void waitTableEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + private void waitTableEvent(ProcedureEvent event, Procedure procedure) { final TableName tableName = getTableName(procedure); final boolean isDebugEnabled = LOG.isDebugEnabled(); schedLock.lock(); try { TableQueue queue = getTableQueue(tableName); + queue.addFront(procedure); if (queue.isSuspended()) return; - // TODO: if !suspendQueue - if (isDebugEnabled) { LOG.debug("Suspend table queue " + tableName); } @@ -346,7 +386,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private void waitServerEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + private void waitServerEvent(ProcedureEvent event, Procedure procedure) { final ServerName serverName = getServerName(procedure); final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -354,10 +394,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { try { // TODO: This will change once we have the new AM ServerQueue queue = getServerQueue(serverName); + queue.addFront(procedure); if (queue.isSuspended()) return; - // TODO: if !suspendQueue - if (isDebugEnabled) { LOG.debug("Suspend server queue " + serverName); } @@ -399,6 +438,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { addToRunQueue(serverRunQueue, queue); } + while (event.hasWaitingProcedures()) { + wakeProcedure(event.popWaitingProcedure(false)); + } + if (queueSize > 1) { schedWaitCond.signalAll(); } else if (queueSize > 0) { @@ -410,7 +453,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - public static class ProcedureEvent { + private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) { + procedure.suspend(); + event.suspendProcedure(procedure); + } + + private void wakeProcedure(Procedure procedure) { + procedure.resume(); + doAdd(procedure, /* addFront= */ true, /* notify= */false); + } + + private static abstract class BaseProcedureEvent { + private ArrayDeque waitingProcedures = null; + + protected void suspendProcedure(Procedure proc) { + if (waitingProcedures == null) { + waitingProcedures = new ArrayDeque(); + } + waitingProcedures.addLast(proc); + } + + protected boolean hasWaitingProcedures() { + return waitingProcedures != null; + } + + protected Procedure popWaitingProcedure(boolean popFront) { + // it will be nice to use IterableList on a procedure and avoid allocations... + Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); + if (waitingProcedures.isEmpty()) { + waitingProcedures = null; + } + return proc; + } + } + + public static class ProcedureEvent extends BaseProcedureEvent { private final String description; private Queue waitingServers = null; @@ -585,9 +662,47 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } + private static class RegionEvent extends BaseProcedureEvent { + private final HRegionInfo regionInfo; + private long exclusiveLockProcIdOwner = Long.MIN_VALUE; + + public RegionEvent(HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + } + + public boolean hasExclusiveLock() { + return exclusiveLockProcIdOwner != Long.MIN_VALUE; + } + + public boolean isLockOwner(long procId) { + return exclusiveLockProcIdOwner == procId; + } + + public boolean tryExclusiveLock(long procIdOwner) { + assert procIdOwner != Long.MIN_VALUE; + if (hasExclusiveLock()) return false; + exclusiveLockProcIdOwner = procIdOwner; + return true; + } + + private void releaseExclusiveLock() { + exclusiveLockProcIdOwner = Long.MIN_VALUE; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + @Override + public String toString() { + return String.format("region %s event", regionInfo.getRegionNameAsString()); + } + } + public static class TableQueue extends QueueImpl { private final NamespaceQueue namespaceQueue; + private HashMap regionEventMap; private TableLock tableLock = null; public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { @@ -601,7 +716,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { @Override public synchronized boolean isAvailable() { - return super.isAvailable() && !namespaceQueue.hasExclusiveLock(); + // if there are no items in the queue, or the namespace is locked. + // we can't execute operation on this table + if (isEmpty() || namespaceQueue.hasExclusiveLock()) { + return false; + } + + if (hasExclusiveLock()) { + // if we have an exclusive lock already taken + // only child of the lock owner can be executed + Procedure availProc = peek(); + return availProc != null && availProc.hasParent() && + isLockOwner(availProc.getParentProcId()); + } + + // no xlock + return true; + } + + public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) { + if (regionEventMap == null) { + regionEventMap = new HashMap(); + } + RegionEvent event = regionEventMap.get(regionInfo); + if (event == null) { + event = new RegionEvent(regionInfo); + regionEventMap.put(regionInfo, event); + } + return event; + } + + public synchronized void removeRegionEvent(final RegionEvent event) { + regionEventMap.remove(event.getRegionInfo()); + if (regionEventMap.isEmpty()) { + regionEventMap = null; + } } // TODO: We can abort pending/in-progress operation if the new call is @@ -630,6 +779,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); case READ: return false; + // region operations are using the shared-lock on the table + // and then they will grab an xlock on the region. + case SPLIT: + case MERGE: + case ASSIGN: + case UNASSIGN: + return false; default: break; } @@ -882,6 +1038,100 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return true; } + // ============================================================================ + // Region Locking Helpers + // ============================================================================ + public boolean waitRegion(final Procedure procedure, final HRegionInfo regionInfo) { + return waitRegions(procedure, regionInfo.getTable(), regionInfo); + } + + public boolean waitRegions(final Procedure procedure, final TableName table, + final HRegionInfo... regionInfo) { + Arrays.sort(regionInfo); + + final TableQueue queue; + if (procedure.hasParent()) { + // the assumption is that the parent procedure have already the table xlock + queue = getTableQueueWithLock(table); + } else { + // acquire the table shared-lock + queue = tryAcquireTableQueueSharedLock(procedure, table); + if (queue == null) return false; + } + + // acquire region xlocks or wait + boolean hasLock = true; + final RegionEvent[] event = new RegionEvent[regionInfo.length]; + synchronized (queue) { + for (int i = 0; i < regionInfo.length; ++i) { + assert regionInfo[i].getTable().equals(table); + event[i] = queue.getRegionEvent(regionInfo[i]); + if (!event[i].tryExclusiveLock(procedure.getProcId())) { + suspendProcedure(event[i], procedure); + hasLock = false; + while (i-- > 0) { + event[i].releaseExclusiveLock(); + } + break; + } + } + } + + if (!hasLock && !procedure.hasParent()) { + releaseTableSharedLock(procedure, table); + } + return hasLock; + } + + public void wakeRegion(final Procedure procedure, final HRegionInfo regionInfo) { + wakeRegions(procedure, regionInfo.getTable(), regionInfo); + } + + public void wakeRegions(final Procedure procedure,final TableName table, + final HRegionInfo... regionInfo) { + Arrays.sort(regionInfo); + + final TableQueue queue = getTableQueueWithLock(table); + + int numProcs = 0; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; + synchronized (queue) { + for (int i = 0; i < regionInfo.length; ++i) { + assert regionInfo[i].getTable().equals(table); + RegionEvent event = queue.getRegionEvent(regionInfo[i]); + event.releaseExclusiveLock(); + if (event.hasWaitingProcedures()) { + // release one procedure at the time since regions has an xlock + nextProcs[numProcs++] = event.popWaitingProcedure(true); + } else { + queue.removeRegionEvent(event); + } + } + } + + // awake procedures if any + schedLock.lock(); + try { + for (int i = numProcs - 1; i >= 0; --i) { + wakeProcedure(nextProcs[i]); + } + + if (numProcs > 1) { + schedWaitCond.signalAll(); + } else if (numProcs > 0) { + schedWaitCond.signal(); + } + + if (!procedure.hasParent()) { + // release the table shared-lock. + // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) + releaseTableSharedLock(procedure, table); + } + } finally { + schedLock.unlock(); + } + } + // ============================================================================ // Namespace Locking Helpers // ============================================================================ @@ -1080,6 +1330,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return sharedLock == 1; } + public synchronized boolean isLockOwner(long procId) { + return exclusiveLockProcIdOwner == procId; + } + public synchronized boolean tryExclusiveLock(long procIdOwner) { assert procIdOwner != Long.MIN_VALUE; if (isLocked()) return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index cc088f35f0b..deaf406aee3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; public interface TableProcedureInterface { public enum TableOperationType { CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, + SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */ }; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 12042d8cda8..9c3740425d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.procedure; - import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -29,12 +28,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; @@ -60,7 +62,8 @@ public class TestMasterProcedureScheduler { @After public void tearDown() throws IOException { - assertEquals(0, queue.size()); + assertEquals("proc-queue expected to be empty", 0, queue.size()); + queue.clear(); } @Test @@ -346,6 +349,201 @@ public class TestMasterProcedureScheduler { assertEquals(4, procId); } + @Test + public void testVerifyRegionLocks() throws Exception { + final TableName tableName = TableName.valueOf("testtb"); + final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")); + final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")); + final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestRegionProcedure(2, tableName, + TableProcedureInterface.TableOperationType.MERGE, regionA, regionB)); + queue.addBack(new TestRegionProcedure(3, tableName, + TableProcedureInterface.TableOperationType.SPLIT, regionA)); + queue.addBack(new TestRegionProcedure(4, tableName, + TableProcedureInterface.TableOperationType.SPLIT, regionB)); + queue.addBack(new TestRegionProcedure(5, tableName, + TableProcedureInterface.TableOperationType.UNASSIGN, regionC)); + + // Fetch the 1st item and take the write lock + Procedure proc = queue.poll(); + assertEquals(1, proc.getProcId()); + assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName)); + + // everything is locked by the table operation + assertEquals(null, queue.poll(0)); + + // release the table lock + queue.releaseTableExclusiveLock(proc, tableName); + + // Fetch the 2nd item and the the lock on regionA and regionB + Procedure mergeProc = queue.poll(); + assertEquals(2, mergeProc.getProcId()); + assertEquals(true, queue.waitRegions(mergeProc, tableName, regionA, regionB)); + + // Fetch the 3rd item and the try to lock region A which will fail + // because already locked. this procedure will go in waiting. + // (this stuff will be explicit until we get rid of the zk-lock) + Procedure procA = queue.poll(); + assertEquals(3, procA.getProcId()); + assertEquals(false, queue.waitRegions(procA, tableName, regionA)); + + // Fetch the 4th item, same story as the 3rd + Procedure procB = queue.poll(); + assertEquals(4, procB.getProcId()); + assertEquals(false, queue.waitRegions(procB, tableName, regionB)); + + // Fetch the 5th item, since it is a non-locked region we are able to execute it + Procedure procC = queue.poll(); + assertEquals(5, procC.getProcId()); + assertEquals(true, queue.waitRegions(procC, tableName, regionC)); + + // 3rd and 4th are in the region suspended queue + assertEquals(null, queue.poll(0)); + + // Release region A-B from merge operation (procId=2) + queue.wakeRegions(mergeProc, tableName, regionA, regionB); + + // Fetch the 3rd item, now the lock on the region is available + procA = queue.poll(); + assertEquals(3, procA.getProcId()); + assertEquals(true, queue.waitRegions(procA, tableName, regionA)); + + // Fetch the 4th item, now the lock on the region is available + procB = queue.poll(); + assertEquals(4, procB.getProcId()); + assertEquals(true, queue.waitRegions(procB, tableName, regionB)); + + // release the locks on the regions + queue.wakeRegions(procA, tableName, regionA); + queue.wakeRegions(procB, tableName, regionB); + queue.wakeRegions(procC, tableName, regionC); + } + + @Test + public void testVerifySubProcRegionLocks() throws Exception { + final TableName tableName = TableName.valueOf("testVerifySubProcRegionLocks"); + final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")); + final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")); + final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.ENABLE)); + + // Fetch the 1st item from the queue, "the root procedure" and take the table lock + Procedure rootProc = queue.poll(); + assertEquals(1, rootProc.getProcId()); + assertEquals(true, queue.tryAcquireTableExclusiveLock(rootProc, tableName)); + assertEquals(null, queue.poll(0)); + + // Execute the 1st step of the root-proc. + // we should get 3 sub-proc back, one for each region. + // (this step is done by the executor/rootProc, we are simulating it) + Procedure[] subProcs = new Procedure[] { + new TestRegionProcedure(1, 2, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionA), + new TestRegionProcedure(1, 3, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionB), + new TestRegionProcedure(1, 4, tableName, + TableProcedureInterface.TableOperationType.ASSIGN, regionC), + }; + + // at this point the rootProc is going in a waiting state + // and the sub-procedures will be added in the queue. + // (this step is done by the executor, we are simulating it) + for (int i = subProcs.length - 1; i >= 0; --i) { + queue.addFront(subProcs[i]); + } + assertEquals(subProcs.length, queue.size()); + + // we should be able to fetch and execute all the sub-procs, + // since they are operating on different regions + for (int i = 0; i < subProcs.length; ++i) { + TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0); + assertEquals(subProcs[i].getProcId(), regionProc.getProcId()); + assertEquals(true, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo())); + } + + // nothing else in the queue + assertEquals(null, queue.poll(0)); + + // release all the region locks + for (int i = 0; i < subProcs.length; ++i) { + TestRegionProcedure regionProc = (TestRegionProcedure)subProcs[i]; + queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo()); + } + + // nothing else in the queue + assertEquals(null, queue.poll(0)); + + // release the table lock (for the root procedure) + queue.releaseTableExclusiveLock(rootProc, tableName); + } + + @Test + public void testSuspendedTableQueue() throws Exception { + final TableName tableName = TableName.valueOf("testSuspendedQueue"); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestTableProcedure(2, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + + Procedure proc = queue.poll(); + assertEquals(1, proc.getProcId()); + assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + + // Suspend + // TODO: If we want to keep the zk-lock we need to retain the lock on suspend + ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent"); + queue.waitEvent(event, proc, true); + queue.releaseTableExclusiveLock(proc, tableName); + assertEquals(null, queue.poll(0)); + + // Resume + queue.wake(event); + + proc = queue.poll(); + assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(1, proc.getProcId()); + queue.releaseTableExclusiveLock(proc, tableName); + + proc = queue.poll(); + assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); + assertEquals(2, proc.getProcId()); + queue.releaseTableExclusiveLock(proc, tableName); + } + + @Test + public void testSuspendedProcedure() throws Exception { + final TableName tableName = TableName.valueOf("testSuspendedProcedure"); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.READ)); + queue.addBack(new TestTableProcedure(2, tableName, + TableProcedureInterface.TableOperationType.READ)); + + Procedure proc = queue.poll(); + assertEquals(1, proc.getProcId()); + + // suspend + ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent"); + queue.waitEvent(event, proc); + + proc = queue.poll(); + assertEquals(2, proc.getProcId()); + assertEquals(null, queue.poll(0)); + + // resume + queue.wake(event); + + proc = queue.poll(); + assertEquals(1, proc.getProcId()); + assertEquals(null, queue.poll(0)); + } + /** * Verify that "write" operations for a single table are serialized, * but different tables can be executed in parallel. @@ -522,6 +720,32 @@ public class TestMasterProcedureScheduler { } } + public static class TestRegionProcedure extends TestTableProcedure { + private final HRegionInfo[] regionInfo; + + public TestRegionProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType, + HRegionInfo... regionInfo) { + this(-1, procId, tableName, opType, regionInfo); + } + + public TestRegionProcedure(long parentProcId, long procId, TableName tableName, + TableOperationType opType, HRegionInfo... regionInfo) { + super(procId, tableName, opType); + this.regionInfo = regionInfo; + if (parentProcId > 0) { + setParentProcId(parentProcId); + } + } + + public HRegionInfo[] getRegionInfo() { + return regionInfo; + } + } + public static class TestNamespaceProcedure extends TestProcedure implements TableProcedureInterface { private final TableOperationType opType;