HBASE-15371: Procedure V2 - Completed support parent-child procedure (Stephen Yuan Jiang)
This commit is contained in:
parent
69e442efe4
commit
60b81dc848
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -45,6 +46,8 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
private int stateCount = 0;
|
private int stateCount = 0;
|
||||||
private int[] states = null;
|
private int[] states = null;
|
||||||
|
|
||||||
|
private ArrayList<Procedure> subProcList = null;
|
||||||
|
|
||||||
protected enum Flow {
|
protected enum Flow {
|
||||||
HAS_MORE_STATE,
|
HAS_MORE_STATE,
|
||||||
NO_MORE_STATE,
|
NO_MORE_STATE,
|
||||||
|
@ -107,6 +110,17 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a child procedure to execute
|
||||||
|
* @param subProcedure the child procedure
|
||||||
|
*/
|
||||||
|
protected void addChildProcedure(Procedure subProcedure) {
|
||||||
|
if (subProcList == null) {
|
||||||
|
subProcList = new ArrayList<Procedure>();
|
||||||
|
}
|
||||||
|
subProcList.add(subProcedure);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(final TEnvironment env)
|
protected Procedure[] execute(final TEnvironment env)
|
||||||
throws ProcedureYieldException, InterruptedException {
|
throws ProcedureYieldException, InterruptedException {
|
||||||
|
@ -120,6 +134,13 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
// completed
|
// completed
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (subProcList != null && subProcList.size() != 0) {
|
||||||
|
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
|
||||||
|
subProcList = null;
|
||||||
|
return subProcedures;
|
||||||
|
}
|
||||||
|
|
||||||
return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
|
return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
|
||||||
} finally {
|
} finally {
|
||||||
updateTimestamp();
|
updateTimestamp();
|
||||||
|
|
|
@ -337,8 +337,13 @@ public class TestProcedureRecovery {
|
||||||
|
|
||||||
public TestStateMachineProcedure() {}
|
public TestStateMachineProcedure() {}
|
||||||
|
|
||||||
|
public TestStateMachineProcedure(final boolean testSubmitChildProc) {
|
||||||
|
this.submitChildProc = testSubmitChildProc;
|
||||||
|
}
|
||||||
|
|
||||||
private AtomicBoolean aborted = new AtomicBoolean(false);
|
private AtomicBoolean aborted = new AtomicBoolean(false);
|
||||||
private int iResult = 0;
|
private int iResult = 0;
|
||||||
|
private boolean submitChildProc = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
|
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
|
||||||
|
@ -350,7 +355,13 @@ public class TestProcedureRecovery {
|
||||||
break;
|
break;
|
||||||
case STATE_2:
|
case STATE_2:
|
||||||
LOG.info("execute step 2 " + this);
|
LOG.info("execute step 2 " + this);
|
||||||
|
if (submitChildProc) {
|
||||||
|
addChildProcedure(new TestStateMachineProcedure());
|
||||||
|
addChildProcedure(new TestStateMachineProcedure());
|
||||||
|
setNextState(State.DONE);
|
||||||
|
} else {
|
||||||
setNextState(State.STATE_3);
|
setNextState(State.STATE_3);
|
||||||
|
}
|
||||||
iResult += 5;
|
iResult += 5;
|
||||||
break;
|
break;
|
||||||
case STATE_3:
|
case STATE_3:
|
||||||
|
@ -363,6 +374,8 @@ public class TestProcedureRecovery {
|
||||||
}
|
}
|
||||||
setNextState(State.DONE);
|
setNextState(State.DONE);
|
||||||
iResult += 7;
|
iResult += 7;
|
||||||
|
break;
|
||||||
|
case DONE:
|
||||||
setResult(Bytes.toBytes(iResult));
|
setResult(Bytes.toBytes(iResult));
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
|
@ -424,6 +437,14 @@ public class TestProcedureRecovery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testStateMachineMultipleLevel() throws Exception {
|
||||||
|
long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
|
||||||
|
// Wait the completion
|
||||||
|
ProcedureTestingUtility.waitProcedure(procExecutor, procId);
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testStateMachineRecovery() throws Exception {
|
public void testStateMachineRecovery() throws Exception {
|
||||||
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
|
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
|
||||||
|
@ -447,6 +468,11 @@ public class TestProcedureRecovery {
|
||||||
assertFalse(procExecutor.isRunning());
|
assertFalse(procExecutor.isRunning());
|
||||||
|
|
||||||
// Step 3 exec
|
// Step 3 exec
|
||||||
|
restart();
|
||||||
|
waitProcedure(procId);
|
||||||
|
ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
|
||||||
|
assertFalse(procExecutor.isRunning());
|
||||||
|
|
||||||
restart();
|
restart();
|
||||||
waitProcedure(procId);
|
waitProcedure(procId);
|
||||||
assertTrue(procExecutor.isRunning());
|
assertTrue(procExecutor.isRunning());
|
||||||
|
|
Loading…
Reference in New Issue