HBASE-15371: Procedure V2 - Completed support parent-child procedure (Stephen Yuan Jiang)

This commit is contained in:
Stephen Yuan Jiang 2016-03-01 19:37:56 -08:00
parent 12853268e1
commit d0c818630b
2 changed files with 48 additions and 1 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -45,6 +46,8 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
private int stateCount = 0;
private int[] states = null;
private ArrayList<Procedure> subProcList = null;
protected enum Flow {
HAS_MORE_STATE,
NO_MORE_STATE,
@ -107,6 +110,17 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
return false;
}
/**
* Add a child procedure to execute
* @param subProcedure the child procedure
*/
protected void addChildProcedure(Procedure subProcedure) {
if (subProcList == null) {
subProcList = new ArrayList<Procedure>();
}
subProcList.add(subProcedure);
}
@Override
protected Procedure[] execute(final TEnvironment env)
throws ProcedureYieldException, InterruptedException {
@ -120,6 +134,13 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
// completed
return null;
}
if (subProcList != null && subProcList.size() != 0) {
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
subProcList = null;
return subProcedures;
}
return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
} finally {
updateTimestamp();

View File

@ -336,8 +336,13 @@ public class TestProcedureRecovery {
public TestStateMachineProcedure() {}
public TestStateMachineProcedure(final boolean testSubmitChildProc) {
this.submitChildProc = testSubmitChildProc;
}
private AtomicBoolean aborted = new AtomicBoolean(false);
private int iResult = 0;
private boolean submitChildProc = false;
@Override
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
@ -349,7 +354,13 @@ public class TestProcedureRecovery {
break;
case STATE_2:
LOG.info("execute step 2 " + this);
if (submitChildProc) {
addChildProcedure(new TestStateMachineProcedure());
addChildProcedure(new TestStateMachineProcedure());
setNextState(State.DONE);
} else {
setNextState(State.STATE_3);
}
iResult += 5;
break;
case STATE_3:
@ -362,6 +373,8 @@ public class TestProcedureRecovery {
}
setNextState(State.DONE);
iResult += 7;
break;
case DONE:
setResult(Bytes.toBytes(iResult));
return Flow.NO_MORE_STATE;
default:
@ -423,6 +436,14 @@ public class TestProcedureRecovery {
}
}
@Test(timeout=30000)
public void testStateMachineMultipleLevel() throws Exception {
long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
// Wait the completion
ProcedureTestingUtility.waitProcedure(procExecutor, procId);
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
}
@Test(timeout=30000)
public void testStateMachineRecovery() throws Exception {
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
@ -446,6 +467,11 @@ public class TestProcedureRecovery {
assertFalse(procExecutor.isRunning());
// Step 3 exec
restart();
waitProcedure(procId);
ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
assertFalse(procExecutor.isRunning());
restart();
waitProcedure(procId);
assertTrue(procExecutor.isRunning());