HBASE-21291 Add a test for bypassing stuck state-machine procedures
Signed-off-by: Allan Yang <allan163@apache.org>
This commit is contained in:
parent
fa652cc610
commit
821e4d7de2
|
@ -1054,6 +1054,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException {
|
boolean bypassProcedure(long pid, long lockWait, boolean force) throws IOException {
|
||||||
|
Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
|
||||||
Procedure<TEnvironment> procedure = getProcedure(pid);
|
Procedure<TEnvironment> procedure = getProcedure(pid);
|
||||||
if (procedure == null) {
|
if (procedure == null) {
|
||||||
LOG.debug("Procedure with id={} does not exist, skipping bypass", pid);
|
LOG.debug("Procedure with id={} does not exist, skipping bypass", pid);
|
||||||
|
|
|
@ -400,6 +400,46 @@ public class ProcedureTestingUtility {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class NoopStateMachineProcedure<TEnv, TState>
|
||||||
|
extends StateMachineProcedure<TEnv, TState> {
|
||||||
|
private TState initialState;
|
||||||
|
private TEnv env;
|
||||||
|
|
||||||
|
public NoopStateMachineProcedure() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public NoopStateMachineProcedure(TEnv env, TState initialState) {
|
||||||
|
this.env = env;
|
||||||
|
this.initialState = initialState;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(TEnv env, TState tState)
|
||||||
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TState getState(int stateId) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(TState tState) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TState getInitialState() {
|
||||||
|
return initialState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class TestProcedure extends NoopProcedure<Void> {
|
public static class TestProcedure extends NoopProcedure<Void> {
|
||||||
private byte[] data = null;
|
private byte[] data = null;
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -119,6 +121,20 @@ public class TestProcedureBypass {
|
||||||
LOG.info("{} finished", proc);
|
LOG.info("{} finished", proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBypassingStuckStateMachineProcedure() throws Exception {
|
||||||
|
final StuckStateMachineProcedure proc =
|
||||||
|
new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START);
|
||||||
|
long id = procExecutor.submitProcedure(proc);
|
||||||
|
Thread.sleep(500);
|
||||||
|
// bypass the procedure
|
||||||
|
assertFalse(procExecutor.bypassProcedure(id, 1000, false));
|
||||||
|
assertTrue(procExecutor.bypassProcedure(id, 1000, true));
|
||||||
|
|
||||||
|
htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
|
||||||
|
LOG.info("{} finished", proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -181,5 +197,52 @@ public class TestProcedureBypass {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public enum StuckStateMachineState {
|
||||||
|
START, THEN, END
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class StuckStateMachineProcedure extends
|
||||||
|
ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> {
|
||||||
|
private AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public StuckStateMachineProcedure() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) {
|
||||||
|
super(env, initialState);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState)
|
||||||
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
|
switch (tState) {
|
||||||
|
case START:
|
||||||
|
LOG.info("PHASE 1: START");
|
||||||
|
setNextState(StuckStateMachineState.THEN);
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case THEN:
|
||||||
|
if (stop.get()) {
|
||||||
|
setNextState(StuckStateMachineState.END);
|
||||||
|
}
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
case END:
|
||||||
|
return Flow.NO_MORE_STATE;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + tState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected StuckStateMachineState getState(int stateId) {
|
||||||
|
return StuckStateMachineState.values()[stateId];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(StuckStateMachineState tState) {
|
||||||
|
return tState.ordinal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue