HBASE-21437 Bypassed procedure throw IllegalArgumentException when its state is WAITING_TIMEOUT
Signed-off-by: Allan Yang <allan163@apache.org>
This commit is contained in:
parent
0875fa0634
commit
c6090d4f04
|
@ -1033,15 +1033,22 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
store.update(procedure);
|
||||
}
|
||||
|
||||
// If we don't have the lock, we can't re-submit the queue,
|
||||
// since it is already executing. To get rid of the stuck situation, we
|
||||
// need to restart the master. With the procedure set to bypass, the procedureExecutor
|
||||
// will bypass it and won't get stuck again.
|
||||
if (lockEntry != null) {
|
||||
// add the procedure to run queue,
|
||||
// If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
|
||||
// Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
|
||||
if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
|
||||
LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
|
||||
if (timeoutExecutor.remove(procedure)) {
|
||||
LOG.debug("removed procedure {} from timeoutExecutor", procedure);
|
||||
timeoutExecutor.executeTimedoutProcedure(procedure);
|
||||
}
|
||||
} else if (lockEntry != null) {
|
||||
scheduler.addFront(procedure);
|
||||
LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure);
|
||||
} else {
|
||||
// If we don't have the lock, we can't re-submit the queue,
|
||||
// since it is already executing. To get rid of the stuck situation, we
|
||||
// need to restart the master. With the procedure set to bypass, the procedureExecutor
|
||||
// will bypass it and won't get stuck again.
|
||||
LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, "
|
||||
+ "skipping add to queue", procedure);
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
|
|||
}
|
||||
}
|
||||
|
||||
private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
|
||||
protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
|
||||
// The procedure received a timeout. if the procedure itself does not handle it,
|
||||
// call abort() and add the procedure back in the queue for rollback.
|
||||
if (proc.setTimeoutFailure(executor.getEnvironment())) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
|
@ -80,7 +81,7 @@ public class TestProcedureBypass {
|
|||
procStore);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
ProcedureTestingUtility
|
||||
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -149,6 +150,18 @@ public class TestProcedureBypass {
|
|||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBypassingWaitingTimeoutProcedures() throws Exception {
|
||||
final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
|
||||
long id = procExecutor.submitProcedure(proc);
|
||||
Thread.sleep(500);
|
||||
// bypass the procedure
|
||||
assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
|
||||
|
||||
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||
LOG.info("{} finished", proc);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
procExecutor.stop();
|
||||
|
@ -188,6 +201,29 @@ public class TestProcedureBypass {
|
|||
|
||||
}
|
||||
|
||||
public static class WaitingTimeoutProcedure
|
||||
extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||
public WaitingTimeoutProcedure() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env)
|
||||
throws ProcedureSuspendedException {
|
||||
// Always suspend the procedure
|
||||
setTimeout(50000);
|
||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||
skipPersistence();
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
|
||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||
procExecutor.getScheduler().addFront(this);
|
||||
return false; // 'false' means that this procedure handled the timeout
|
||||
}
|
||||
}
|
||||
|
||||
public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||
private boolean childSpwaned = false;
|
||||
|
|
Loading…
Reference in New Issue