HBASE-15107 Procedure v2 - Procedure Queue with Region locks

This commit is contained in:
Matteo Bertozzi 2016-06-08 12:52:58 -07:00
parent d05a3722c8
commit d5d9b7d500
7 changed files with 580 additions and 26 deletions
hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2
hbase-server/src
main/java/org/apache/hadoop/hbase/master/procedure
test/java/org/apache/hadoop/hbase/master/procedure

View File

@ -79,6 +79,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
private int childrenLatch = 0;
private long lastUpdate;
// TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks
private boolean suspended = false;
private RemoteProcedureException exception = null;
private byte[] result = null;
@ -94,7 +97,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* @throws InterruptedException the procedure will be added back to the queue and retried later
*/
protected abstract Procedure[] execute(TEnvironment env)
throws ProcedureYieldException, InterruptedException;
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
/**
* The code to undo what done by the execute() code.
@ -276,6 +279,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
*/
protected void toStringState(StringBuilder builder) {
builder.append(getState());
if (isSuspended()) {
builder.append("|SUSPENDED");
}
}
/**
@ -319,7 +325,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
public long getParentProcId() {
return parentProcId;
return parentProcId.longValue();
}
public NonceKey getNonceKey() {
@ -371,6 +377,23 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return false;
}
/**
* @return true if the procedure is in a suspended state,
* waiting for the resources required to execute the procedure will become available.
*/
public synchronized boolean isSuspended() {
return suspended;
}
public synchronized void suspend() {
suspended = true;
}
public synchronized void resume() {
assert isSuspended() : this + " expected suspended state, got " + state;
suspended = false;
}
public synchronized RemoteProcedureException getException() {
return exception;
}
@ -398,7 +421,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* @return the timeout in msec
*/
public int getTimeout() {
return timeout;
return timeout.intValue();
}
/**
@ -494,7 +517,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
*/
@InterfaceAudience.Private
protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException, InterruptedException {
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
return execute(env);

View File

@ -505,15 +505,25 @@ public class ProcedureExecutor<TEnvironment> {
}
};
long st, et;
// Acquire the store lease.
st = EnvironmentEdgeManager.currentTime();
store.recoverLease();
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("recover procedure store (%s) lease: %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// TODO: Split in two steps.
// TODO: Handle corrupted procedures (currently just a warn)
// The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures.
st = EnvironmentEdgeManager.currentTime();
load(abortOnCorruption);
et = EnvironmentEdgeManager.currentTime();
LOG.info(String.format("load procedure store (%s): %s",
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
// Start the executors. Here we must have the lastProcId set.
for (int i = 0; i < threads.length; ++i) {
@ -840,7 +850,7 @@ public class ProcedureExecutor<TEnvironment> {
}
// Execute the procedure
assert proc.getState() == ProcedureState.RUNNABLE;
assert proc.getState() == ProcedureState.RUNNABLE : proc;
if (proc.acquireLock(getEnvironment())) {
execProcedure(procStack, proc);
proc.releaseLock(getEnvironment());
@ -1042,6 +1052,7 @@ public class ProcedureExecutor<TEnvironment> {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE);
// Execute the procedure
boolean isSuspended = false;
boolean reExecute = false;
Procedure[] subprocs = null;
do {
@ -1051,6 +1062,8 @@ public class ProcedureExecutor<TEnvironment> {
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
isSuspended = true;
} catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
@ -1086,7 +1099,7 @@ public class ProcedureExecutor<TEnvironment> {
break;
}
assert subproc.getState() == ProcedureState.INITIALIZING;
assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
subproc.setParentProcId(procedure.getProcId());
subproc.setProcId(nextProcId());
}
@ -1107,7 +1120,7 @@ public class ProcedureExecutor<TEnvironment> {
}
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(procedure);
} else {
} else if (!isSuspended) {
// No subtask, so we are done
procedure.setState(ProcedureState.FINISHED);
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ProcedureSuspendedException extends ProcedureException {
/** default constructor */
public ProcedureSuspendedException() {
super();
}
/**
* Constructor
* @param s message
*/
public ProcedureSuspendedException(String s) {
super(s);
}
}

View File

@ -42,7 +42,7 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir
@Override
protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException, InterruptedException {
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
updateTimestamp();
try {
Procedure[] children = !executed ? execute(env) : null;

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
@ -103,6 +106,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private void doAdd(final Procedure proc, final boolean addFront) {
doAdd(proc, addFront, true);
}
private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) {
schedLock.lock();
try {
if (isTableProcedure(proc)) {
@ -117,7 +124,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
throw new UnsupportedOperationException(
"RQs for non-table/non-server procedures are not implemented yet");
}
schedWaitCond.signal();
if (notify) {
schedWaitCond.signal();
}
} finally {
schedLock.unlock();
}
@ -125,12 +134,28 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure proc, final boolean addFront) {
if (proc.isSuspended()) return;
queue.add(proc, addFront);
if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
// the queue is not suspended or removed from the fairq (run-queue)
// because someone has an xlock on it.
// so, if the queue is not-linked we should add it
if (queue.size() == 1 && !IterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
} else if (proc.hasParent() && queue.isLockOwner(proc.getParentProcId())) {
assert addFront : "expected to add a child in the front";
assert !queue.isSuspended() : "unexpected suspended state for the queue";
// our (proc) parent has the xlock,
// so the queue is not in the fairq (run-queue)
// add it back to let the child run (inherit the lock)
if (!IterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
}
}
@ -140,7 +165,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
Procedure poll(long waitNsec) {
protected Procedure poll(long waitNsec) {
Procedure pollResult = null;
schedLock.lock();
try {
@ -185,7 +210,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
this.queueSize--;
if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
removeFromRunQueue(fairq, rq);
} else if (pollResult.hasParent() && rq.isLockOwner(pollResult.getParentProcId())) {
// if the rq is in the fairq because of runnable child
// check if the next procedure is still a child.
// if not, remove the rq from the fairq and go back to the xlock state
Procedure nextProc = rq.peek();
if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) {
removeFromRunQueue(fairq, rq);
}
}
return pollResult;
}
@ -300,18 +334,25 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue);
}
private boolean waitEvent(ProcedureEvent event, boolean lockEvent,
Procedure procedure, boolean suspendQueue) {
synchronized (event) {
if (event.isReady()) {
if (lockEvent) {
event.setReady(false);
}
return false;
}
// TODO: Suspend single procedure not implemented yet, fallback to suspending the queue
if (!suspendQueue) suspendQueue = true;
if (isTableProcedure(procedure)) {
waitTableEvent(event, procedure, suspendQueue);
if (!suspendQueue) {
suspendProcedure(event, procedure);
} else if (isTableProcedure(procedure)) {
waitTableEvent(event, procedure);
} else if (isServerProcedure(procedure)) {
waitServerEvent(event, procedure, suspendQueue);
waitServerEvent(event, procedure);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@ -324,17 +365,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return true;
}
private void waitTableEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
private void waitTableEvent(ProcedureEvent event, Procedure procedure) {
final TableName tableName = getTableName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
schedLock.lock();
try {
TableQueue queue = getTableQueue(tableName);
queue.addFront(procedure);
if (queue.isSuspended()) return;
// TODO: if !suspendQueue
if (isDebugEnabled) {
LOG.debug("Suspend table queue " + tableName);
}
@ -346,7 +386,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private void waitServerEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
private void waitServerEvent(ProcedureEvent event, Procedure procedure) {
final ServerName serverName = getServerName(procedure);
final boolean isDebugEnabled = LOG.isDebugEnabled();
@ -354,10 +394,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
try {
// TODO: This will change once we have the new AM
ServerQueue queue = getServerQueue(serverName);
queue.addFront(procedure);
if (queue.isSuspended()) return;
// TODO: if !suspendQueue
if (isDebugEnabled) {
LOG.debug("Suspend server queue " + serverName);
}
@ -399,6 +438,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
addToRunQueue(serverRunQueue, queue);
}
while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false));
}
if (queueSize > 1) {
schedWaitCond.signalAll();
} else if (queueSize > 0) {
@ -410,7 +453,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
public static class ProcedureEvent {
private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) {
procedure.suspend();
event.suspendProcedure(procedure);
}
private void wakeProcedure(Procedure procedure) {
procedure.resume();
doAdd(procedure, /* addFront= */ true, /* notify= */false);
}
private static abstract class BaseProcedureEvent {
private ArrayDeque<Procedure> waitingProcedures = null;
protected void suspendProcedure(Procedure proc) {
if (waitingProcedures == null) {
waitingProcedures = new ArrayDeque<Procedure>();
}
waitingProcedures.addLast(proc);
}
protected boolean hasWaitingProcedures() {
return waitingProcedures != null;
}
protected Procedure popWaitingProcedure(boolean popFront) {
// it will be nice to use IterableList on a procedure and avoid allocations...
Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast();
if (waitingProcedures.isEmpty()) {
waitingProcedures = null;
}
return proc;
}
}
public static class ProcedureEvent extends BaseProcedureEvent {
private final String description;
private Queue<ServerName> waitingServers = null;
@ -585,9 +662,47 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private static class RegionEvent extends BaseProcedureEvent {
private final HRegionInfo regionInfo;
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
public RegionEvent(HRegionInfo regionInfo) {
this.regionInfo = regionInfo;
}
public boolean hasExclusiveLock() {
return exclusiveLockProcIdOwner != Long.MIN_VALUE;
}
public boolean isLockOwner(long procId) {
return exclusiveLockProcIdOwner == procId;
}
public boolean tryExclusiveLock(long procIdOwner) {
assert procIdOwner != Long.MIN_VALUE;
if (hasExclusiveLock()) return false;
exclusiveLockProcIdOwner = procIdOwner;
return true;
}
private void releaseExclusiveLock() {
exclusiveLockProcIdOwner = Long.MIN_VALUE;
}
public HRegionInfo getRegionInfo() {
return regionInfo;
}
@Override
public String toString() {
return String.format("region %s event", regionInfo.getRegionNameAsString());
}
}
public static class TableQueue extends QueueImpl<TableName> {
private final NamespaceQueue namespaceQueue;
private HashMap<HRegionInfo, RegionEvent> regionEventMap;
private TableLock tableLock = null;
public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) {
@ -601,7 +716,41 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
@Override
public synchronized boolean isAvailable() {
return super.isAvailable() && !namespaceQueue.hasExclusiveLock();
// if there are no items in the queue, or the namespace is locked.
// we can't execute operation on this table
if (isEmpty() || namespaceQueue.hasExclusiveLock()) {
return false;
}
if (hasExclusiveLock()) {
// if we have an exclusive lock already taken
// only child of the lock owner can be executed
Procedure availProc = peek();
return availProc != null && availProc.hasParent() &&
isLockOwner(availProc.getParentProcId());
}
// no xlock
return true;
}
public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) {
if (regionEventMap == null) {
regionEventMap = new HashMap<HRegionInfo, RegionEvent>();
}
RegionEvent event = regionEventMap.get(regionInfo);
if (event == null) {
event = new RegionEvent(regionInfo);
regionEventMap.put(regionInfo, event);
}
return event;
}
public synchronized void removeRegionEvent(final RegionEvent event) {
regionEventMap.remove(event.getRegionInfo());
if (regionEventMap.isEmpty()) {
regionEventMap = null;
}
}
// TODO: We can abort pending/in-progress operation if the new call is
@ -630,6 +779,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
case READ:
return false;
// region operations are using the shared-lock on the table
// and then they will grab an xlock on the region.
case SPLIT:
case MERGE:
case ASSIGN:
case UNASSIGN:
return false;
default:
break;
}
@ -882,6 +1038,100 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return true;
}
// ============================================================================
// Region Locking Helpers
// ============================================================================
public boolean waitRegion(final Procedure procedure, final HRegionInfo regionInfo) {
return waitRegions(procedure, regionInfo.getTable(), regionInfo);
}
public boolean waitRegions(final Procedure procedure, final TableName table,
final HRegionInfo... regionInfo) {
Arrays.sort(regionInfo);
final TableQueue queue;
if (procedure.hasParent()) {
// the assumption is that the parent procedure have already the table xlock
queue = getTableQueueWithLock(table);
} else {
// acquire the table shared-lock
queue = tryAcquireTableQueueSharedLock(procedure, table);
if (queue == null) return false;
}
// acquire region xlocks or wait
boolean hasLock = true;
final RegionEvent[] event = new RegionEvent[regionInfo.length];
synchronized (queue) {
for (int i = 0; i < regionInfo.length; ++i) {
assert regionInfo[i].getTable().equals(table);
event[i] = queue.getRegionEvent(regionInfo[i]);
if (!event[i].tryExclusiveLock(procedure.getProcId())) {
suspendProcedure(event[i], procedure);
hasLock = false;
while (i-- > 0) {
event[i].releaseExclusiveLock();
}
break;
}
}
}
if (!hasLock && !procedure.hasParent()) {
releaseTableSharedLock(procedure, table);
}
return hasLock;
}
public void wakeRegion(final Procedure procedure, final HRegionInfo regionInfo) {
wakeRegions(procedure, regionInfo.getTable(), regionInfo);
}
public void wakeRegions(final Procedure procedure,final TableName table,
final HRegionInfo... regionInfo) {
Arrays.sort(regionInfo);
final TableQueue queue = getTableQueueWithLock(table);
int numProcs = 0;
final Procedure[] nextProcs = new Procedure[regionInfo.length];
synchronized (queue) {
for (int i = 0; i < regionInfo.length; ++i) {
assert regionInfo[i].getTable().equals(table);
RegionEvent event = queue.getRegionEvent(regionInfo[i]);
event.releaseExclusiveLock();
if (event.hasWaitingProcedures()) {
// release one procedure at the time since regions has an xlock
nextProcs[numProcs++] = event.popWaitingProcedure(true);
} else {
queue.removeRegionEvent(event);
}
}
}
// awake procedures if any
schedLock.lock();
try {
for (int i = numProcs - 1; i >= 0; --i) {
wakeProcedure(nextProcs[i]);
}
if (numProcs > 1) {
schedWaitCond.signalAll();
} else if (numProcs > 0) {
schedWaitCond.signal();
}
if (!procedure.hasParent()) {
// release the table shared-lock.
// (if we have a parent, it is holding an xlock so we didn't take the shared-lock)
releaseTableSharedLock(procedure, table);
}
} finally {
schedLock.unlock();
}
}
// ============================================================================
// Namespace Locking Helpers
// ============================================================================
@ -1080,6 +1330,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return sharedLock == 1;
}
public synchronized boolean isLockOwner(long procId) {
return exclusiveLockProcIdOwner == procId;
}
public synchronized boolean tryExclusiveLock(long procIdOwner) {
assert procIdOwner != Long.MIN_VALUE;
if (isLocked()) return false;

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
public interface TableProcedureInterface {
public enum TableOperationType {
CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */
};
/**

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@ -29,12 +28,15 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
@ -60,7 +62,8 @@ public class TestMasterProcedureScheduler {
@After
public void tearDown() throws IOException {
assertEquals(0, queue.size());
assertEquals("proc-queue expected to be empty", 0, queue.size());
queue.clear();
}
@Test
@ -346,6 +349,201 @@ public class TestMasterProcedureScheduler {
assertEquals(4, procId);
}
@Test
public void testVerifyRegionLocks() throws Exception {
final TableName tableName = TableName.valueOf("testtb");
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestRegionProcedure(2, tableName,
TableProcedureInterface.TableOperationType.MERGE, regionA, regionB));
queue.addBack(new TestRegionProcedure(3, tableName,
TableProcedureInterface.TableOperationType.SPLIT, regionA));
queue.addBack(new TestRegionProcedure(4, tableName,
TableProcedureInterface.TableOperationType.SPLIT, regionB));
queue.addBack(new TestRegionProcedure(5, tableName,
TableProcedureInterface.TableOperationType.UNASSIGN, regionC));
// Fetch the 1st item and take the write lock
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
// everything is locked by the table operation
assertEquals(null, queue.poll(0));
// release the table lock
queue.releaseTableExclusiveLock(proc, tableName);
// Fetch the 2nd item and the the lock on regionA and regionB
Procedure mergeProc = queue.poll();
assertEquals(2, mergeProc.getProcId());
assertEquals(true, queue.waitRegions(mergeProc, tableName, regionA, regionB));
// Fetch the 3rd item and the try to lock region A which will fail
// because already locked. this procedure will go in waiting.
// (this stuff will be explicit until we get rid of the zk-lock)
Procedure procA = queue.poll();
assertEquals(3, procA.getProcId());
assertEquals(false, queue.waitRegions(procA, tableName, regionA));
// Fetch the 4th item, same story as the 3rd
Procedure procB = queue.poll();
assertEquals(4, procB.getProcId());
assertEquals(false, queue.waitRegions(procB, tableName, regionB));
// Fetch the 5th item, since it is a non-locked region we are able to execute it
Procedure procC = queue.poll();
assertEquals(5, procC.getProcId());
assertEquals(true, queue.waitRegions(procC, tableName, regionC));
// 3rd and 4th are in the region suspended queue
assertEquals(null, queue.poll(0));
// Release region A-B from merge operation (procId=2)
queue.wakeRegions(mergeProc, tableName, regionA, regionB);
// Fetch the 3rd item, now the lock on the region is available
procA = queue.poll();
assertEquals(3, procA.getProcId());
assertEquals(true, queue.waitRegions(procA, tableName, regionA));
// Fetch the 4th item, now the lock on the region is available
procB = queue.poll();
assertEquals(4, procB.getProcId());
assertEquals(true, queue.waitRegions(procB, tableName, regionB));
// release the locks on the regions
queue.wakeRegions(procA, tableName, regionA);
queue.wakeRegions(procB, tableName, regionB);
queue.wakeRegions(procC, tableName, regionC);
}
@Test
public void testVerifySubProcRegionLocks() throws Exception {
final TableName tableName = TableName.valueOf("testVerifySubProcRegionLocks");
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.ENABLE));
// Fetch the 1st item from the queue, "the root procedure" and take the table lock
Procedure rootProc = queue.poll();
assertEquals(1, rootProc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(rootProc, tableName));
assertEquals(null, queue.poll(0));
// Execute the 1st step of the root-proc.
// we should get 3 sub-proc back, one for each region.
// (this step is done by the executor/rootProc, we are simulating it)
Procedure[] subProcs = new Procedure[] {
new TestRegionProcedure(1, 2, tableName,
TableProcedureInterface.TableOperationType.ASSIGN, regionA),
new TestRegionProcedure(1, 3, tableName,
TableProcedureInterface.TableOperationType.ASSIGN, regionB),
new TestRegionProcedure(1, 4, tableName,
TableProcedureInterface.TableOperationType.ASSIGN, regionC),
};
// at this point the rootProc is going in a waiting state
// and the sub-procedures will be added in the queue.
// (this step is done by the executor, we are simulating it)
for (int i = subProcs.length - 1; i >= 0; --i) {
queue.addFront(subProcs[i]);
}
assertEquals(subProcs.length, queue.size());
// we should be able to fetch and execute all the sub-procs,
// since they are operating on different regions
for (int i = 0; i < subProcs.length; ++i) {
TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0);
assertEquals(subProcs[i].getProcId(), regionProc.getProcId());
assertEquals(true, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
}
// nothing else in the queue
assertEquals(null, queue.poll(0));
// release all the region locks
for (int i = 0; i < subProcs.length; ++i) {
TestRegionProcedure regionProc = (TestRegionProcedure)subProcs[i];
queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo());
}
// nothing else in the queue
assertEquals(null, queue.poll(0));
// release the table lock (for the root procedure)
queue.releaseTableExclusiveLock(rootProc, tableName);
}
@Test
public void testSuspendedTableQueue() throws Exception {
final TableName tableName = TableName.valueOf("testSuspendedQueue");
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestTableProcedure(2, tableName,
TableProcedureInterface.TableOperationType.EDIT));
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
// Suspend
// TODO: If we want to keep the zk-lock we need to retain the lock on suspend
ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent");
queue.waitEvent(event, proc, true);
queue.releaseTableExclusiveLock(proc, tableName);
assertEquals(null, queue.poll(0));
// Resume
queue.wake(event);
proc = queue.poll();
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(1, proc.getProcId());
queue.releaseTableExclusiveLock(proc, tableName);
proc = queue.poll();
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(2, proc.getProcId());
queue.releaseTableExclusiveLock(proc, tableName);
}
@Test
public void testSuspendedProcedure() throws Exception {
final TableName tableName = TableName.valueOf("testSuspendedProcedure");
queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.READ));
queue.addBack(new TestTableProcedure(2, tableName,
TableProcedureInterface.TableOperationType.READ));
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
// suspend
ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
queue.waitEvent(event, proc);
proc = queue.poll();
assertEquals(2, proc.getProcId());
assertEquals(null, queue.poll(0));
// resume
queue.wake(event);
proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(null, queue.poll(0));
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
@ -522,6 +720,32 @@ public class TestMasterProcedureScheduler {
}
}
public static class TestRegionProcedure extends TestTableProcedure {
private final HRegionInfo[] regionInfo;
public TestRegionProcedure() {
throw new UnsupportedOperationException("recovery should not be triggered here");
}
public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType,
HRegionInfo... regionInfo) {
this(-1, procId, tableName, opType, regionInfo);
}
public TestRegionProcedure(long parentProcId, long procId, TableName tableName,
TableOperationType opType, HRegionInfo... regionInfo) {
super(procId, tableName, opType);
this.regionInfo = regionInfo;
if (parentProcId > 0) {
setParentProcId(parentProcId);
}
}
public HRegionInfo[] getRegionInfo() {
return regionInfo;
}
}
public static class TestNamespaceProcedure extends TestProcedure
implements TableProcedureInterface {
private final TableOperationType opType;