HBASE-27905 Directly schedule procedures that do not need to acquire locks
This commit is contained in:
parent
7b571ca9e4
commit
867c44ce1c
|
@ -272,6 +272,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean needLock() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The user should override this method if they need a lock on an Entity. A lock can be anything,
|
||||
* and it is up to the implementor. The Procedure Framework will call this method just before it
|
||||
|
@ -974,6 +978,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
if (waitInitialized(env)) {
|
||||
return LockState.LOCK_EVENT_WAIT;
|
||||
}
|
||||
if (!needLock()) {
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
if (lockedWhenLoading) {
|
||||
// reset it so we will not consider it anymore
|
||||
lockedWhenLoading = false;
|
||||
|
@ -1000,6 +1007,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
* Internal method called by the ProcedureExecutor that starts the user-level code releaseLock().
|
||||
*/
|
||||
final void doReleaseLock(TEnvironment env, ProcedureStore store) {
|
||||
if (!needLock()) {
|
||||
return;
|
||||
}
|
||||
|
||||
locked = false;
|
||||
// persist that we have released the lock. This must be done before we actually release the
|
||||
// lock. Another procedure may take this lock immediately after we release the lock, and if we
|
||||
|
|
|
@ -149,19 +149,22 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
Procedure<?> proc, boolean addFront) {
|
||||
queue.add(proc, addFront);
|
||||
// For the following conditions, we will put the queue back into execution
|
||||
// 1. The procedure has already held the lock, or the lock has been restored when restarting,
|
||||
// 1. The procedure does not need any lock at all.
|
||||
// 2. The procedure has already held the lock, or the lock has been restored when restarting,
|
||||
// which means it can be executed immediately.
|
||||
// 2. The exclusive lock for this queue has not been held.
|
||||
// 3. The given procedure has the exclusive lock permission for this queue.
|
||||
// 3. The exclusive lock for this queue has not been held.
|
||||
// 4. The given procedure has the exclusive lock permission for this queue.
|
||||
Supplier<String> reason = null;
|
||||
if (proc.hasLock()) {
|
||||
if (!proc.needLock()) {
|
||||
reason = () -> proc + " does not need any lock";
|
||||
} else if (proc.needLock() && proc.hasLock()) {
|
||||
reason = () -> proc + " has lock";
|
||||
} else if (proc.isLockedWhenLoading()) {
|
||||
reason = () -> proc + " restores lock when restarting";
|
||||
} else if (!queue.getLockStatus().hasExclusiveLock()) {
|
||||
reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
|
||||
} else if (queue.getLockStatus().hasLockAccess(proc)) {
|
||||
reason = () -> proc + " has the excusive lock access";
|
||||
reason = () -> proc + " has the exclusive lock access";
|
||||
}
|
||||
if (reason != null) {
|
||||
addToRunQueue(fairq, queue, reason);
|
||||
|
@ -219,6 +222,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
// procedures, then we give up and remove the queue from run queue.
|
||||
for (int i = 0, n = rq.size(); i < n; i++) {
|
||||
Procedure<?> proc = rq.poll();
|
||||
if (!proc.needLock()) {
|
||||
return proc;
|
||||
}
|
||||
if (isLockReady(proc, rq)) {
|
||||
// the queue is empty, remove from run queue
|
||||
if (rq.isEmpty()) {
|
||||
|
@ -368,6 +374,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
|
||||
Queue<T> queue, Supplier<String> reason) {
|
||||
if (hasNoLockNeededProcedure(queue)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("DO NOT remove {} from run queue because There are still procedures in the "
|
||||
+ "queue that do not need to acquire locks", queue);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
|
||||
}
|
||||
|
@ -376,6 +389,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T extends Comparable<T>> boolean hasNoLockNeededProcedure(Queue<T> q) {
|
||||
boolean ret = false;
|
||||
// TODO: Iterate Queue in a more efficient way ?
|
||||
for (int i = 0, n = q.size(); i < n; i++) {
|
||||
Procedure<?> proc = q.poll();
|
||||
if (!proc.needLock()) {
|
||||
ret = true;
|
||||
}
|
||||
q.add(proc, false);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Table Queue Lookup Helpers
|
||||
// ============================================================================
|
||||
|
|
|
@ -1200,4 +1200,28 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeRegion(proc, regionInfo);
|
||||
queue.wakeTableExclusiveLock(parentProc, tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectlyScheduleProcedureThatDoesNotNeedLock() {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TestTableProcedure xLockNeededProc =
|
||||
new TestTableProcedure(1, tableName, TableOperationType.DELETE);
|
||||
TestTableProcedure noLockNeededProc =
|
||||
new TestTableProcedure(2, tableName, TableOperationType.READ) {
|
||||
@Override
|
||||
public boolean needLock() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
queue.addBack(xLockNeededProc);
|
||||
queue.addBack(noLockNeededProc);
|
||||
|
||||
assertSame(xLockNeededProc, queue.poll());
|
||||
assertEquals(1, queue.size());
|
||||
|
||||
// now the table exclusive lock has been acquired
|
||||
assertFalse(queue.waitTableExclusiveLock(xLockNeededProc, tableName));
|
||||
|
||||
assertSame(noLockNeededProc, queue.poll());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue