HBASE-21364 Procedure holds the lock should put to front of the queue after restart

This commit is contained in:
Allan Yang 2018-10-25 12:05:28 +08:00
parent 5dde5b7878
commit 614612a9d8
4 changed files with 38 additions and 1 deletions

View File

@ -85,6 +85,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
push(procedure, true, true);
}
@Override
public void addFront(final Procedure procedure, boolean notify) {
push(procedure, true, notify);
}
@Override
public void addFront(Iterator<Procedure> procedureIterator) {
schedLock();
@ -109,6 +114,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
push(procedure, false, true);
}
@Override
public void addBack(final Procedure procedure, boolean notify) {
push(procedure, false, notify);
}
protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
schedLock();
try {

View File

@ -717,6 +717,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
this.lockedWhenLoading = true;
}
public boolean isLockedWhenLoading() {
return lockedWhenLoading;
}
// ==============================================================================================
// Runtime state, updated every operation by the ProcedureExecutor
//

View File

@ -652,8 +652,17 @@ public class ProcedureExecutor<TEnvironment> {
if (!p.hasParent()) {
sendProcedureLoadedNotification(p.getProcId());
}
scheduler.addBack(p);
// If the procedure holds the lock, put the procedure in front
if (p.isLockedWhenLoading()) {
scheduler.addFront(p, false);
} else {
// if it was not, it can wait.
scheduler.addBack(p, false);
}
});
// After all procedures put into the queue, signal the worker threads.
// Otherwise, there is a race condition. See HBASE-21364.
scheduler.signalAll();
}
/**

View File

@ -52,6 +52,13 @@ public interface ProcedureScheduler {
*/
void addFront(Procedure proc);
/**
* Inserts the specified element at the front of this queue.
* @param proc the Procedure to add
* @param notify whether need to notify worker
*/
void addFront(Procedure proc, boolean notify);
/**
* Inserts all elements in the iterator at the front of this queue.
*/
@ -63,6 +70,13 @@ public interface ProcedureScheduler {
*/
void addBack(Procedure proc);
/**
* Inserts the specified element at the end of this queue.
* @param proc the Procedure to add
* @param notify whether need to notify worker
*/
void addBack(Procedure proc, boolean notify);
/**
* The procedure can't run at the moment.
* add it back to the queue, giving priority to someone else.