From 320d657fb4caa86fdd8bfbfda02ae6e26ae1877f Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Fri, 9 Nov 2018 22:59:30 +0800 Subject: [PATCH] HBASE-21437 Bypassed procedure throw IllegalArgumentException when its state is WAITING_TIMEOUT Signed-off-by: Allan Yang --- .../hbase/procedure2/ProcedureExecutor.java | 22 ++++++++---- .../procedure2/TimeoutExecutorThread.java | 2 +- .../hbase/procedure2/TestProcedureBypass.java | 36 +++++++++++++++++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 084da21b83c..6c0549279ac 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -967,17 +967,25 @@ public class ProcedureExecutor { store.update(procedure); } - // If we don't have the lock, we can't re-submit the queue, - // since it is already executing. To get rid of the stuck situation, we - // need to restart the master. With the procedure set to bypass, the procedureExecutor - // will bypass it and won't get stuck again. - if (lockEntry != null) { - // add the procedure to run queue, + // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. + // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE + if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); + if (timeoutExecutor.remove(procedure)) { + LOG.debug("removed procedure {} from timeoutExecutor", procedure); + timeoutExecutor.executeTimedoutProcedure(procedure); + } + } else if (lockEntry != null) { scheduler.addFront(procedure); LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); } else { + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " - + "skipping add to queue", procedure); + + "skipping add to queue", + procedure); } return true; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 9e050a28d30..4416177cfb9 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -126,7 +126,7 @@ class TimeoutExecutorThread extends StoppableThread { } } - private void executeTimedoutProcedure(Procedure proc) { + protected void executeTimedoutProcedure(Procedure proc) { // The procedure received a timeout. if the procedure itself does not handle it, // call abort() and add the procedure back in the queue for rollback. if (proc.setTimeoutFailure(executor.getEnvironment())) { diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java index 7d587fd0ab5..976b718d79b 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @Category({MasterTests.class, SmallTests.class}) @@ -149,6 +150,18 @@ public class TestProcedureBypass { LOG.info("{} finished", proc); } + @Test + public void testBypassingWaitingTimeoutProcedures() throws Exception { + final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + // bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); + + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + @AfterClass public static void tearDown() throws Exception { procExecutor.stop(); @@ -208,6 +221,29 @@ public class TestProcedureBypass { } } + public static class WaitingTimeoutProcedure + extends ProcedureTestingUtility.NoopProcedure { + public WaitingTimeoutProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + setTimeout(50000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(TestProcEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + procExecutor.getScheduler().addFront(this); + return false; // 'false' means that this procedure handled the timeout + } + } public enum StuckStateMachineState { START, THEN, END