diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index d02ca6ea989..c18ca32a42c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -967,17 +967,25 @@ public class ProcedureExecutor { store.update(procedure); } - // If we don't have the lock, we can't re-submit the queue, - // since it is already executing. To get rid of the stuck situation, we - // need to restart the master. With the procedure set to bypass, the procedureExecutor - // will bypass it and won't get stuck again. - if (lockEntry != null) { - // add the procedure to run queue, + // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. + // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE + if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); + if (timeoutExecutor.remove(procedure)) { + LOG.debug("removed procedure {} from timeoutExecutor", procedure); + timeoutExecutor.executeTimedoutProcedure(procedure); + } + } else if (lockEntry != null) { scheduler.addFront(procedure); LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); } else { + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " - + "skipping add to queue", procedure); + + "skipping add to queue", + procedure); } return true; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 9e050a28d30..4416177cfb9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -126,7 +126,7 @@ class TimeoutExecutorThread extends StoppableThread { } } - private void executeTimedoutProcedure(Procedure proc) { + protected void executeTimedoutProcedure(Procedure proc) { // The procedure received a timeout. if the procedure itself does not handle it, // call abort() and add the procedure back in the queue for rollback. if (proc.setTimeoutFailure(executor.getEnvironment())) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java index 7d587fd0ab5..976b718d79b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @Category({MasterTests.class, SmallTests.class}) @@ -149,6 +150,18 @@ public class TestProcedureBypass { LOG.info("{} finished", proc); } + @Test + public void testBypassingWaitingTimeoutProcedures() throws Exception { + final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + // bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); + + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + @AfterClass public static void tearDown() throws Exception { procExecutor.stop(); @@ -208,6 +221,29 @@ public class TestProcedureBypass { } } + public static class WaitingTimeoutProcedure + extends ProcedureTestingUtility.NoopProcedure { + public WaitingTimeoutProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + setTimeout(50000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(TestProcEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + procExecutor.getScheduler().addFront(this); + return false; // 'false' means that this procedure handled the timeout + } + } public enum StuckStateMachineState { START, THEN, END