HBASE-16485 Procedure v2 - Add support to addChildProcedure() as last "step" in StateMachineProcedure

This commit is contained in:
Matteo Bertozzi 2016-08-23 16:42:57 -07:00
parent a1e57d942e
commit 1e15fa57df
3 changed files with 32 additions and 13 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import java.io.IOException; import java.io.IOException;
@ -1288,6 +1289,11 @@ public class ProcedureExecutor<TEnvironment> {
return procId; return procId;
} }
@VisibleForTesting
protected long getLastProcId() {
return lastProcId.get();
}
private Long getRootProcedureId(Procedure proc) { private Long getRootProcedureId(Procedure proc) {
return Procedure.getRootProcedureId(procedures, proc); return Procedure.getRootProcedureId(procedures, proc);
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachinePr
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class StateMachineProcedure<TEnvironment, TState> public abstract class StateMachineProcedure<TEnvironment, TState>
extends Procedure<TEnvironment> { extends Procedure<TEnvironment> {
private Flow stateFlow = Flow.HAS_MORE_STATE;
private int stateCount = 0; private int stateCount = 0;
private int[] states = null; private int[] states = null;
@ -114,11 +115,13 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
* Add a child procedure to execute * Add a child procedure to execute
* @param subProcedure the child procedure * @param subProcedure the child procedure
*/ */
protected void addChildProcedure(Procedure subProcedure) { protected void addChildProcedure(Procedure... subProcedure) {
if (subProcList == null) { if (subProcList == null) {
subProcList = new ArrayList<Procedure>(); subProcList = new ArrayList<Procedure>(subProcedure.length);
}
for (int i = 0; i < subProcedure.length; ++i) {
subProcList.add(subProcedure[i]);
} }
subProcList.add(subProcedure);
} }
@Override @Override
@ -126,14 +129,14 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
throws ProcedureYieldException, InterruptedException { throws ProcedureYieldException, InterruptedException {
updateTimestamp(); updateTimestamp();
try { try {
if (!hasMoreState()) return null;
TState state = getCurrentState(); TState state = getCurrentState();
if (stateCount == 0) { if (stateCount == 0) {
setNextState(getStateId(state)); setNextState(getStateId(state));
} }
if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
// completed stateFlow = executeFromState(env, state);
return null;
}
if (subProcList != null && subProcList.size() != 0) { if (subProcList != null && subProcList.size() != 0) {
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
@ -141,7 +144,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
return subProcedures; return subProcedures;
} }
return (isWaiting() || isFailed()) ? null : new Procedure[] {this}; return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
} finally { } finally {
updateTimestamp(); updateTimestamp();
} }
@ -164,6 +167,10 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
return isYieldBeforeExecuteFromState(env, getCurrentState()); return isYieldBeforeExecuteFromState(env, getCurrentState());
} }
private boolean hasMoreState() {
return stateFlow != Flow.NO_MORE_STATE;
}
private TState getCurrentState() { private TState getCurrentState() {
return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
} }

View File

@ -355,8 +355,7 @@ public class TestProcedureRecovery {
case STATE_2: case STATE_2:
LOG.info("execute step 2 " + this); LOG.info("execute step 2 " + this);
if (submitChildProc) { if (submitChildProc) {
addChildProcedure(new TestStateMachineProcedure()); addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
addChildProcedure(new TestStateMachineProcedure());
setNextState(State.DONE); setNextState(State.DONE);
} else { } else {
setNextState(State.STATE_3); setNextState(State.STATE_3);
@ -375,6 +374,10 @@ public class TestProcedureRecovery {
iResult += 7; iResult += 7;
break; break;
case DONE: case DONE:
if (submitChildProc) {
addChildProcedure(new TestStateMachineProcedure());
}
iResult += 11;
setResult(Bytes.toBytes(iResult)); setResult(Bytes.toBytes(iResult));
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
default: default:
@ -441,7 +444,10 @@ public class TestProcedureRecovery {
long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
// Wait the completion // Wait the completion
ProcedureTestingUtility.waitProcedure(procExecutor, procId); ProcedureTestingUtility.waitProcedure(procExecutor, procId);
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); ProcedureInfo result = procExecutor.getResult(procId);
ProcedureTestingUtility.assertProcNotFailed(result);
assertEquals(19, Bytes.toInt(result.getResult()));
assertEquals(4, procExecutor.getLastProcId());
} }
@Test(timeout=30000) @Test(timeout=30000)
@ -479,7 +485,7 @@ public class TestProcedureRecovery {
// The procedure is completed // The procedure is completed
ProcedureInfo result = procExecutor.getResult(procId); ProcedureInfo result = procExecutor.getResult(procId);
ProcedureTestingUtility.assertProcNotFailed(result); ProcedureTestingUtility.assertProcNotFailed(result);
assertEquals(15, Bytes.toInt(result.getResult())); assertEquals(26, Bytes.toInt(result.getResult()));
} }
@Test(timeout=30000) @Test(timeout=30000)