diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index c4fffa8633f..a5b66a04752 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1033,15 +1033,22 @@ public class ProcedureExecutor { store.update(procedure); } - // If we don't have the lock, we can't re-submit the queue, - // since it is already executing. To get rid of the stuck situation, we - // need to restart the master. With the procedure set to bypass, the procedureExecutor - // will bypass it and won't get stuck again. - if (lockEntry != null) { - // add the procedure to run queue, + // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. + // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE + if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); + if (timeoutExecutor.remove(procedure)) { + LOG.debug("removed procedure {} from timeoutExecutor", procedure); + timeoutExecutor.executeTimedoutProcedure(procedure); + } + } else if (lockEntry != null) { scheduler.addFront(procedure); LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure); } else { + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, " + "skipping add to queue", procedure); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 9e050a28d30..4416177cfb9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -126,7 +126,7 @@ class TimeoutExecutorThread extends StoppableThread { } } - private void executeTimedoutProcedure(Procedure proc) { + protected void executeTimedoutProcedure(Procedure proc) { // The procedure received a timeout. if the procedure itself does not handle it, // call abort() and add the procedure back in the queue for rollback. if (proc.setTimeoutFailure(executor.getEnvironment())) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java index fa406310d8c..de7a0a17266 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @Category({MasterTests.class, SmallTests.class}) @@ -80,7 +81,7 @@ public class TestProcedureBypass { procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); ProcedureTestingUtility - .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); } @Test @@ -149,6 +150,18 @@ public class TestProcedureBypass { LOG.info("{} finished", proc); } + @Test + public void testBypassingWaitingTimeoutProcedures() throws Exception { + final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + // bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); + + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + @AfterClass public static void tearDown() throws Exception { procExecutor.stop(); @@ -188,6 +201,29 @@ public class TestProcedureBypass { } + public static class WaitingTimeoutProcedure + extends ProcedureTestingUtility.NoopProcedure { + public WaitingTimeoutProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + setTimeout(50000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(TestProcEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + procExecutor.getScheduler().addFront(this); + return false; // 'false' means that this procedure handled the timeout + } + } public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure { private boolean childSpwaned = false;