HBASE-21437 Bypassed procedure throw IllegalArgumentException when its state is WAITING_TIMEOUT
Signed-off-by: Allan Yang <allan163@apache.org>
This commit is contained in:
parent
fe2265fa4a
commit
ccabf7310d
|
@ -967,17 +967,25 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
store.update(procedure);
|
store.update(procedure);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we don't have the lock, we can't re-submit the queue,
|
// If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
|
||||||
// since it is already executing. To get rid of the stuck situation, we
|
// Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
|
||||||
// need to restart the master. With the procedure set to bypass, the procedureExecutor
|
if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
|
||||||
// will bypass it and won't get stuck again.
|
LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
|
||||||
if (lockEntry != null) {
|
if (timeoutExecutor.remove(procedure)) {
|
||||||
// add the procedure to run queue,
|
LOG.debug("removed procedure {} from timeoutExecutor", procedure);
|
||||||
|
timeoutExecutor.executeTimedoutProcedure(procedure);
|
||||||
|
}
|
||||||
|
} else if (lockEntry != null) {
|
||||||
scheduler.addFront(procedure);
|
scheduler.addFront(procedure);
|
||||||
LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
|
LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
|
||||||
} else {
|
} else {
|
||||||
|
// If we don't have the lock, we can't re-submit the queue,
|
||||||
|
// since it is already executing. To get rid of the stuck situation, we
|
||||||
|
// need to restart the master. With the procedure set to bypass, the procedureExecutor
|
||||||
|
// will bypass it and won't get stuck again.
|
||||||
LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
|
LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
|
||||||
+ "skipping add to queue", procedure);
|
+ "skipping add to queue",
|
||||||
|
procedure);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
|
protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
|
||||||
// The procedure received a timeout. if the procedure itself does not handle it,
|
// The procedure received a timeout. if the procedure itself does not handle it,
|
||||||
// call abort() and add the procedure back in the queue for rollback.
|
// call abort() and add the procedure back in the queue for rollback.
|
||||||
if (proc.setTimeoutFailure(executor.getEnvironment())) {
|
if (proc.setTimeoutFailure(executor.getEnvironment())) {
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
|
@ -149,6 +150,18 @@ public class TestProcedureBypass {
|
||||||
LOG.info("{} finished", proc);
|
LOG.info("{} finished", proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBypassingWaitingTimeoutProcedures() throws Exception {
|
||||||
|
final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
|
||||||
|
long id = procExecutor.submitProcedure(proc);
|
||||||
|
Thread.sleep(500);
|
||||||
|
// bypass the procedure
|
||||||
|
assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
|
||||||
|
|
||||||
|
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||||
|
LOG.info("{} finished", proc);
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
procExecutor.stop();
|
procExecutor.stop();
|
||||||
|
@ -208,6 +221,29 @@ public class TestProcedureBypass {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class WaitingTimeoutProcedure
|
||||||
|
extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
|
||||||
|
public WaitingTimeoutProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Procedure[] execute(final TestProcEnv env)
|
||||||
|
throws ProcedureSuspendedException {
|
||||||
|
// Always suspend the procedure
|
||||||
|
setTimeout(50000);
|
||||||
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
|
skipPersistence();
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
|
||||||
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
|
procExecutor.getScheduler().addFront(this);
|
||||||
|
return false; // 'false' means that this procedure handled the timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public enum StuckStateMachineState {
|
public enum StuckStateMachineState {
|
||||||
START, THEN, END
|
START, THEN, END
|
||||||
|
|
Loading…
Reference in New Issue