HBASE-16586 Procedure v2 - Cleanup sched wait/lock semantic
This commit is contained in:
parent
2597217ae5
commit
b6b72361b6
|
@ -253,7 +253,7 @@ public class DispatchMergingRegionsProcedure
|
|||
|
||||
@Override
|
||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||
return env.getProcedureQueue().waitRegions(
|
||||
return !env.getProcedureQueue().waitRegions(
|
||||
this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
|
||||
}
|
||||
|
||||
|
|
|
@ -137,18 +137,18 @@ public class MasterProcedureEnv {
|
|||
}
|
||||
|
||||
public void wake(ProcedureEvent event) {
|
||||
procSched.wake(event);
|
||||
procSched.wakeEvent(event);
|
||||
}
|
||||
|
||||
public void suspend(ProcedureEvent event) {
|
||||
procSched.suspend(event);
|
||||
procSched.suspendEvent(event);
|
||||
}
|
||||
|
||||
public void setEventReady(ProcedureEvent event, boolean isReady) {
|
||||
if (isReady) {
|
||||
procSched.wake(event);
|
||||
procSched.wakeEvent(event);
|
||||
} else {
|
||||
procSched.suspend(event);
|
||||
procSched.suspendEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -133,7 +133,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// a group for all the non-table/non-server procedures or try to find a key for your
|
||||
// non-table/non-server procedures and implement something similar to the TableRunQueue.
|
||||
throw new UnsupportedOperationException(
|
||||
"RQs for non-table/non-server procedures are not implemented yet");
|
||||
"RQs for non-table/non-server procedures are not implemented yet: " + proc);
|
||||
}
|
||||
if (notify) {
|
||||
schedWaitCond.signal();
|
||||
|
@ -148,7 +148,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
if (proc.isSuspended()) return;
|
||||
|
||||
queue.add(proc, addFront);
|
||||
|
||||
if (!(queue.isSuspended() || queue.hasExclusiveLock())) {
|
||||
// the queue is not suspended or removed from the fairq (run-queue)
|
||||
// because someone has an xlock on it.
|
||||
|
@ -157,7 +156,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
fairq.add(queue);
|
||||
}
|
||||
queueSize++;
|
||||
} else if (proc.hasParent() && queue.isLockOwner(proc.getParentProcId())) {
|
||||
} else if (queue.hasParentLock(proc)) {
|
||||
assert addFront : "expected to add a child in the front";
|
||||
assert !queue.isSuspended() : "unexpected suspended state for the queue";
|
||||
// our (proc) parent has the xlock,
|
||||
|
@ -211,17 +210,24 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
|
||||
private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
|
||||
Queue<T> rq = fairq.poll();
|
||||
final Queue<T> rq = fairq.poll();
|
||||
if (rq == null || !rq.isAvailable()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
assert !rq.isSuspended() : "rq=" + rq + " is suspended";
|
||||
Procedure pollResult = rq.poll();
|
||||
final Procedure pollResult = rq.peek();
|
||||
final boolean xlockReq = rq.requireExclusiveLock(pollResult);
|
||||
if (xlockReq && rq.isLocked() && !rq.hasParentLock(pollResult)) {
|
||||
// someone is already holding the lock (e.g. shared lock). avoid a yield
|
||||
return null;
|
||||
}
|
||||
|
||||
rq.poll();
|
||||
this.queueSize--;
|
||||
if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) {
|
||||
if (rq.isEmpty() || xlockReq) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
} else if (pollResult.hasParent() && rq.isLockOwner(pollResult.getParentProcId())) {
|
||||
} else if (rq.hasParentLock(pollResult)) {
|
||||
// if the rq is in the fairq because of runnable child
|
||||
// check if the next procedure is still a child.
|
||||
// if not, remove the rq from the fairq and go back to the xlock state
|
||||
|
@ -291,7 +297,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
|
||||
boolean tableDeleted;
|
||||
if (proc.hasException()) {
|
||||
IOException procEx = proc.getException().unwrapRemoteException();
|
||||
IOException procEx = proc.getException().unwrapRemoteException();
|
||||
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
|
||||
// create failed because the table already exist
|
||||
tableDeleted = !(procEx instanceof TableExistsException);
|
||||
|
@ -341,16 +347,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// ============================================================================
|
||||
// Event Helpers
|
||||
// ============================================================================
|
||||
public boolean waitEvent(ProcedureEvent event, Procedure procedure) {
|
||||
/**
|
||||
* Suspend the procedure if the event is not ready yet.
|
||||
* @param event the event to wait on
|
||||
* @param procedure the procedure waiting on the event
|
||||
* @return true if the procedure has to wait for the event to be ready, false otherwise.
|
||||
*/
|
||||
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure) {
|
||||
return waitEvent(event, procedure, false);
|
||||
}
|
||||
|
||||
public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) {
|
||||
/**
|
||||
* Suspend the procedure if the event is not ready yet.
|
||||
* @param event the event to wait on
|
||||
* @param procedure the procedure waiting on the event
|
||||
* @param suspendQueue true if the entire queue of the procedure should be suspended
|
||||
* @return true if the procedure has to wait for the event to be ready, false otherwise.
|
||||
*/
|
||||
public boolean waitEvent(final ProcedureEvent event, final Procedure procedure,
|
||||
final boolean suspendQueue) {
|
||||
return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue);
|
||||
}
|
||||
|
||||
private boolean waitEvent(ProcedureEvent event, boolean lockEvent,
|
||||
Procedure procedure, boolean suspendQueue) {
|
||||
private boolean waitEvent(final ProcedureEvent event, final boolean lockEvent,
|
||||
final Procedure procedure, final boolean suspendQueue) {
|
||||
synchronized (event) {
|
||||
if (event.isReady()) {
|
||||
if (lockEvent) {
|
||||
|
@ -371,13 +391,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// a group for all the non-table/non-server procedures or try to find a key for your
|
||||
// non-table/non-server procedures and implement something similar to the TableRunQueue.
|
||||
throw new UnsupportedOperationException(
|
||||
"RQs for non-table/non-server procedures are not implemented yet");
|
||||
"RQs for non-table/non-server procedures are not implemented yet: " + procedure);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void waitTableEvent(ProcedureEvent event, Procedure procedure) {
|
||||
private void waitTableEvent(final ProcedureEvent event, final Procedure procedure) {
|
||||
final TableName tableName = getTableName(procedure);
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
|
||||
|
@ -398,7 +418,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
}
|
||||
|
||||
private void waitServerEvent(ProcedureEvent event, Procedure procedure) {
|
||||
private void waitServerEvent(final ProcedureEvent event, final Procedure procedure) {
|
||||
final ServerName serverName = getServerName(procedure);
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
|
||||
|
@ -420,39 +440,37 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
}
|
||||
|
||||
public void suspend(ProcedureEvent event) {
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
/**
|
||||
* Mark the event has not ready.
|
||||
* procedures calling waitEvent() will be suspended.
|
||||
* @param event the event to mark as suspended/not ready
|
||||
*/
|
||||
public void suspendEvent(final ProcedureEvent event) {
|
||||
final boolean isTraceEnabled = LOG.isTraceEnabled();
|
||||
synchronized (event) {
|
||||
event.setReady(false);
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Suspend event " + event);
|
||||
if (isTraceEnabled) {
|
||||
LOG.trace("Suspend event " + event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void wake(ProcedureEvent event) {
|
||||
final boolean isDebugEnabled = LOG.isDebugEnabled();
|
||||
/**
|
||||
* Wake every procedure waiting for the specified event
|
||||
* (By design each event has only one "wake" caller)
|
||||
* @param event the event to wait
|
||||
*/
|
||||
public void wakeEvent(final ProcedureEvent event) {
|
||||
final boolean isTraceEnabled = LOG.isTraceEnabled();
|
||||
synchronized (event) {
|
||||
event.setReady(true);
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Wake event " + event);
|
||||
if (isTraceEnabled) {
|
||||
LOG.trace("Wake event " + event);
|
||||
}
|
||||
|
||||
schedLock.lock();
|
||||
try {
|
||||
while (event.hasWaitingTables()) {
|
||||
Queue<TableName> queue = event.popWaitingTable();
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
}
|
||||
// TODO: This will change once we have the new AM
|
||||
while (event.hasWaitingServers()) {
|
||||
Queue<ServerName> queue = event.popWaitingServer();
|
||||
addToRunQueue(serverRunQueue, queue);
|
||||
}
|
||||
|
||||
while (event.hasWaitingProcedures()) {
|
||||
wakeProcedure(event.popWaitingProcedure(false));
|
||||
}
|
||||
popEventWaitingObjects(event);
|
||||
|
||||
if (queueSize > 1) {
|
||||
schedWaitCond.signalAll();
|
||||
|
@ -465,12 +483,61 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
}
|
||||
|
||||
private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) {
|
||||
/**
|
||||
* Wake every procedure waiting for the specified events.
|
||||
* (By design each event has only one "wake" caller)
|
||||
* @param events the list of events to wake
|
||||
* @param count the number of events in the array to wake
|
||||
*/
|
||||
public void wakeEvents(final ProcedureEvent[] events, final int count) {
|
||||
final boolean isTraceEnabled = LOG.isTraceEnabled();
|
||||
schedLock.lock();
|
||||
try {
|
||||
for (int i = 0; i < count; ++i) {
|
||||
final ProcedureEvent event = events[i];
|
||||
synchronized (event) {
|
||||
event.setReady(true);
|
||||
if (isTraceEnabled) {
|
||||
LOG.trace("Wake event " + event);
|
||||
}
|
||||
popEventWaitingObjects(event);
|
||||
}
|
||||
}
|
||||
|
||||
if (queueSize > 1) {
|
||||
schedWaitCond.signalAll();
|
||||
} else if (queueSize > 0) {
|
||||
schedWaitCond.signal();
|
||||
}
|
||||
} finally {
|
||||
schedLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void popEventWaitingObjects(final ProcedureEvent event) {
|
||||
while (event.hasWaitingTables()) {
|
||||
final Queue<TableName> queue = event.popWaitingTable();
|
||||
queue.setSuspended(false);
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
}
|
||||
// TODO: This will change once we have the new AM
|
||||
while (event.hasWaitingServers()) {
|
||||
final Queue<ServerName> queue = event.popWaitingServer();
|
||||
queue.setSuspended(false);
|
||||
addToRunQueue(serverRunQueue, queue);
|
||||
}
|
||||
|
||||
while (event.hasWaitingProcedures()) {
|
||||
wakeProcedure(event.popWaitingProcedure(false));
|
||||
}
|
||||
}
|
||||
|
||||
private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) {
|
||||
procedure.suspend();
|
||||
event.suspendProcedure(procedure);
|
||||
}
|
||||
|
||||
private void wakeProcedure(Procedure procedure) {
|
||||
private void wakeProcedure(final Procedure procedure) {
|
||||
procedure.resume();
|
||||
doAdd(procedure, /* addFront= */ true, /* notify= */false);
|
||||
}
|
||||
|
@ -478,7 +545,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
private static abstract class BaseProcedureEvent {
|
||||
private ArrayDeque<Procedure> waitingProcedures = null;
|
||||
|
||||
protected void suspendProcedure(Procedure proc) {
|
||||
protected void suspendProcedure(final Procedure proc) {
|
||||
if (waitingProcedures == null) {
|
||||
waitingProcedures = new ArrayDeque<Procedure>();
|
||||
}
|
||||
|
@ -489,7 +556,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return waitingProcedures != null;
|
||||
}
|
||||
|
||||
protected Procedure popWaitingProcedure(boolean popFront) {
|
||||
protected Procedure popWaitingProcedure(final boolean popFront) {
|
||||
// it will be nice to use IterableList on a procedure and avoid allocations...
|
||||
Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast();
|
||||
if (waitingProcedures.isEmpty()) {
|
||||
|
@ -506,7 +573,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
private Queue<TableName> waitingTables = null;
|
||||
private boolean ready = false;
|
||||
|
||||
public ProcedureEvent(String description) {
|
||||
protected ProcedureEvent() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public ProcedureEvent(final String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
|
@ -533,7 +604,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
private Queue<TableName> popWaitingTable() {
|
||||
Queue<TableName> node = waitingTables;
|
||||
waitingTables = AvlIterableList.remove(waitingTables, node);
|
||||
node.setSuspended(false);
|
||||
return node;
|
||||
}
|
||||
|
||||
|
@ -544,13 +614,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
private Queue<ServerName> popWaitingServer() {
|
||||
Queue<ServerName> node = waitingServers;
|
||||
waitingServers = AvlIterableList.remove(waitingServers, node);
|
||||
node.setSuspended(false);
|
||||
return node;
|
||||
}
|
||||
|
||||
protected String getDescription() {
|
||||
if (description == null) {
|
||||
// you should override this method if you are using the default constructor
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("ProcedureEvent(%s)", description);
|
||||
return String.format("%s(%s)", getClass().getSimpleName(), getDescription());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -692,9 +769,9 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return exclusiveLockProcIdOwner == procId;
|
||||
}
|
||||
|
||||
public boolean tryExclusiveLock(long procIdOwner) {
|
||||
public boolean tryExclusiveLock(final long procIdOwner) {
|
||||
assert procIdOwner != Long.MIN_VALUE;
|
||||
if (hasExclusiveLock()) return false;
|
||||
if (hasExclusiveLock() && !isLockOwner(procIdOwner)) return false;
|
||||
exclusiveLockProcIdOwner = procIdOwner;
|
||||
return true;
|
||||
}
|
||||
|
@ -747,8 +824,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// if we have an exclusive lock already taken
|
||||
// only child of the lock owner can be executed
|
||||
Procedure availProc = peek();
|
||||
return availProc != null && availProc.hasParent() &&
|
||||
isLockOwner(availProc.getParentProcId());
|
||||
return availProc != null && hasParentLock(availProc);
|
||||
}
|
||||
|
||||
// no xlock
|
||||
|
@ -1077,10 +1153,23 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// ============================================================================
|
||||
// Region Locking Helpers
|
||||
// ============================================================================
|
||||
/**
|
||||
* Suspend the procedure if the specified region is already locked.
|
||||
* @param procedure the procedure trying to acquire the lock on the region
|
||||
* @param regionInfo the region we are trying to lock
|
||||
* @return true if the procedure has to wait for the regions to be available
|
||||
*/
|
||||
public boolean waitRegion(final Procedure procedure, final HRegionInfo regionInfo) {
|
||||
return waitRegions(procedure, regionInfo.getTable(), regionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspend the procedure if the specified set of regions are already locked.
|
||||
* @param procedure the procedure trying to acquire the lock on the regions
|
||||
* @param table the table name of the regions we are trying to lock
|
||||
* @param regionInfo the list of regions we are trying to lock
|
||||
* @return true if the procedure has to wait for the regions to be available
|
||||
*/
|
||||
public boolean waitRegions(final Procedure procedure, final TableName table,
|
||||
final HRegionInfo... regionInfo) {
|
||||
Arrays.sort(regionInfo);
|
||||
|
@ -1092,7 +1181,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
} else {
|
||||
// acquire the table shared-lock
|
||||
queue = tryAcquireTableQueueSharedLock(procedure, table);
|
||||
if (queue == null) return false;
|
||||
if (queue == null) return true;
|
||||
}
|
||||
|
||||
// acquire region xlocks or wait
|
||||
|
@ -1101,6 +1190,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
synchronized (queue) {
|
||||
for (int i = 0; i < regionInfo.length; ++i) {
|
||||
assert regionInfo[i].getTable().equals(table);
|
||||
assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i];
|
||||
|
||||
event[i] = queue.getRegionEvent(regionInfo[i]);
|
||||
if (!event[i].tryExclusiveLock(procedure.getProcId())) {
|
||||
suspendProcedure(event[i], procedure);
|
||||
|
@ -1116,13 +1207,23 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
if (!hasLock && !procedure.hasParent()) {
|
||||
releaseTableSharedLock(procedure, table);
|
||||
}
|
||||
return hasLock;
|
||||
return !hasLock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wake the procedures waiting for the specified region
|
||||
* @param procedure the procedure that was holding the region
|
||||
* @param regionInfo the region the procedure was holding
|
||||
*/
|
||||
public void wakeRegion(final Procedure procedure, final HRegionInfo regionInfo) {
|
||||
wakeRegions(procedure, regionInfo.getTable(), regionInfo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wake the procedures waiting for the specified regions
|
||||
* @param procedure the procedure that was holding the regions
|
||||
* @param regionInfo the list of regions the procedure was holding
|
||||
*/
|
||||
public void wakeRegions(final Procedure procedure,final TableName table,
|
||||
final HRegionInfo... regionInfo) {
|
||||
Arrays.sort(regionInfo);
|
||||
|
@ -1132,8 +1233,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
int numProcs = 0;
|
||||
final Procedure[] nextProcs = new Procedure[regionInfo.length];
|
||||
synchronized (queue) {
|
||||
HRegionInfo prevRegion = null;
|
||||
for (int i = 0; i < regionInfo.length; ++i) {
|
||||
assert regionInfo[i].getTable().equals(table);
|
||||
assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i];
|
||||
|
||||
RegionEvent event = queue.getRegionEvent(regionInfo[i]);
|
||||
event.releaseExclusiveLock();
|
||||
if (event.hasWaitingProcedures()) {
|
||||
|
@ -1365,9 +1469,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return exclusiveLockProcIdOwner == procId;
|
||||
}
|
||||
|
||||
public synchronized boolean tryExclusiveLock(long procIdOwner) {
|
||||
public synchronized boolean hasParentLock(final Procedure proc) {
|
||||
return proc.hasParent() && isLockOwner(proc.getParentProcId());
|
||||
}
|
||||
|
||||
public synchronized boolean tryExclusiveLock(final long procIdOwner) {
|
||||
assert procIdOwner != Long.MIN_VALUE;
|
||||
if (isLocked()) return false;
|
||||
if (isLocked() && !isLockOwner(procIdOwner)) return false;
|
||||
exclusiveLockProcIdOwner = procIdOwner;
|
||||
return true;
|
||||
}
|
||||
|
@ -1396,7 +1504,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s(%s)", getClass().getSimpleName(), key);
|
||||
return String.format("%s(%s, suspended=%s xlock=%s sharedLock=%s size=%s)",
|
||||
getClass().getSimpleName(), key, isSuspended(), hasExclusiveLock(), sharedLock, size());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -268,12 +268,13 @@ public class TestMasterProcedureScheduler {
|
|||
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
|
||||
|
||||
// Fetch the 3rd item and verify that the lock can't be acquired
|
||||
Procedure wrProc = queue.poll();
|
||||
assertEquals(3, wrProc.getProcId());
|
||||
assertEquals(false, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// release the rdlock of item 2 and take the wrlock for the 3d item
|
||||
queue.releaseTableSharedLock(rdProc, tableName);
|
||||
|
||||
// Fetch the 3rd item and take the write lock
|
||||
Procedure wrProc = queue.poll();
|
||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
|
||||
|
||||
// Fetch 4th item and verify that the lock can't be acquired
|
||||
|
@ -385,6 +386,50 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXLockWaitingForExecutingSharedLockToRelease() {
|
||||
final TableName tableName = TableName.valueOf("testtb");
|
||||
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
|
||||
queue.addBack(new TestRegionProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.ASSIGN, regionA));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.UNASSIGN, regionA));
|
||||
|
||||
// Fetch the 1st item and take the shared lock
|
||||
Procedure proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(proc, regionA));
|
||||
|
||||
// the xlock operation in the queue can't be executed
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// release the shared lock
|
||||
queue.wakeRegion(proc, regionA);
|
||||
|
||||
// Fetch the 2nd item and take the xlock
|
||||
proc = queue.poll();
|
||||
assertEquals(2, proc.getProcId());
|
||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
|
||||
|
||||
// everything is locked by the table operation
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// release the table xlock
|
||||
queue.releaseTableExclusiveLock(proc, tableName);
|
||||
|
||||
// grab the last item in the queue
|
||||
proc = queue.poll();
|
||||
assertEquals(3, proc.getProcId());
|
||||
|
||||
// lock and unlock the region
|
||||
assertEquals(false, queue.waitRegion(proc, regionA));
|
||||
assertEquals(null, queue.poll(0));
|
||||
queue.wakeRegion(proc, regionA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyRegionLocks() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testtb");
|
||||
|
@ -417,24 +462,24 @@ public class TestMasterProcedureScheduler {
|
|||
// Fetch the 2nd item and the the lock on regionA and regionB
|
||||
Procedure mergeProc = queue.poll();
|
||||
assertEquals(2, mergeProc.getProcId());
|
||||
assertEquals(true, queue.waitRegions(mergeProc, tableName, regionA, regionB));
|
||||
assertEquals(false, queue.waitRegions(mergeProc, tableName, regionA, regionB));
|
||||
|
||||
// Fetch the 3rd item and the try to lock region A which will fail
|
||||
// because already locked. this procedure will go in waiting.
|
||||
// (this stuff will be explicit until we get rid of the zk-lock)
|
||||
Procedure procA = queue.poll();
|
||||
assertEquals(3, procA.getProcId());
|
||||
assertEquals(false, queue.waitRegions(procA, tableName, regionA));
|
||||
assertEquals(true, queue.waitRegions(procA, tableName, regionA));
|
||||
|
||||
// Fetch the 4th item, same story as the 3rd
|
||||
Procedure procB = queue.poll();
|
||||
assertEquals(4, procB.getProcId());
|
||||
assertEquals(false, queue.waitRegions(procB, tableName, regionB));
|
||||
assertEquals(true, queue.waitRegions(procB, tableName, regionB));
|
||||
|
||||
// Fetch the 5th item, since it is a non-locked region we are able to execute it
|
||||
Procedure procC = queue.poll();
|
||||
assertEquals(5, procC.getProcId());
|
||||
assertEquals(true, queue.waitRegions(procC, tableName, regionC));
|
||||
assertEquals(false, queue.waitRegions(procC, tableName, regionC));
|
||||
|
||||
// 3rd and 4th are in the region suspended queue
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
@ -445,12 +490,12 @@ public class TestMasterProcedureScheduler {
|
|||
// Fetch the 3rd item, now the lock on the region is available
|
||||
procA = queue.poll();
|
||||
assertEquals(3, procA.getProcId());
|
||||
assertEquals(true, queue.waitRegions(procA, tableName, regionA));
|
||||
assertEquals(false, queue.waitRegions(procA, tableName, regionA));
|
||||
|
||||
// Fetch the 4th item, now the lock on the region is available
|
||||
procB = queue.poll();
|
||||
assertEquals(4, procB.getProcId());
|
||||
assertEquals(true, queue.waitRegions(procB, tableName, regionB));
|
||||
assertEquals(false, queue.waitRegions(procB, tableName, regionB));
|
||||
|
||||
// release the locks on the regions
|
||||
queue.wakeRegions(procA, tableName, regionA);
|
||||
|
@ -499,7 +544,7 @@ public class TestMasterProcedureScheduler {
|
|||
for (int i = 0; i < subProcs.length; ++i) {
|
||||
TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0);
|
||||
assertEquals(subProcs[i].getProcId(), regionProc.getProcId());
|
||||
assertEquals(true, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
|
||||
assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
|
||||
}
|
||||
|
||||
// nothing else in the queue
|
||||
|
@ -534,12 +579,12 @@ public class TestMasterProcedureScheduler {
|
|||
// Suspend
|
||||
// TODO: If we want to keep the zk-lock we need to retain the lock on suspend
|
||||
ProcedureEvent event = new ProcedureEvent("testSuspendedTableQueueEvent");
|
||||
queue.waitEvent(event, proc, true);
|
||||
assertEquals(true, queue.waitEvent(event, proc, true));
|
||||
queue.releaseTableExclusiveLock(proc, tableName);
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// Resume
|
||||
queue.wake(event);
|
||||
queue.wakeEvent(event);
|
||||
|
||||
proc = queue.poll();
|
||||
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
|
||||
|
@ -566,14 +611,14 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
// suspend
|
||||
ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
|
||||
queue.waitEvent(event, proc);
|
||||
assertEquals(true, queue.waitEvent(event, proc));
|
||||
|
||||
proc = queue.poll();
|
||||
assertEquals(2, proc.getProcId());
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// resume
|
||||
queue.wake(event);
|
||||
queue.wakeEvent(event);
|
||||
|
||||
proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
|
@ -740,11 +785,15 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
|
||||
public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
|
||||
super(procId);
|
||||
this(-1, procId, tableName, opType);
|
||||
}
|
||||
|
||||
public TestTableProcedure(long parentProcId, long procId, TableName tableName,
|
||||
TableOperationType opType) {
|
||||
super(procId, parentProcId);
|
||||
this.tableName = tableName;
|
||||
this.opType = opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return tableName;
|
||||
|
@ -754,6 +803,14 @@ public class TestMasterProcedureScheduler {
|
|||
public TableOperationType getTableOperationType() {
|
||||
return opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (table=");
|
||||
sb.append(getTableName());
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestRegionProcedure extends TestTableProcedure {
|
||||
|
@ -770,16 +827,21 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
public TestRegionProcedure(long parentProcId, long procId, TableName tableName,
|
||||
TableOperationType opType, HRegionInfo... regionInfo) {
|
||||
super(procId, tableName, opType);
|
||||
super(parentProcId, procId, tableName, opType);
|
||||
this.regionInfo = regionInfo;
|
||||
if (parentProcId > 0) {
|
||||
setParentProcId(parentProcId);
|
||||
}
|
||||
}
|
||||
|
||||
public HRegionInfo[] getRegionInfo() {
|
||||
return regionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (region=");
|
||||
sb.append(getRegionInfo());
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestNamespaceProcedure extends TestProcedure
|
||||
|
|
|
@ -558,7 +558,7 @@ public class TestAccessController extends SecureTestUtil {
|
|||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return null;
|
||||
return TableOperationType.EDIT;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue