HBASE-16485 Procedure v2 - Add support to addChildProcedure() as last "step" in StateMachineProcedure
This commit is contained in:
parent
cb507b8cff
commit
91fee265cf
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -1290,6 +1291,11 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return procId;
|
return procId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected long getLastProcId() {
|
||||||
|
return lastProcId.get();
|
||||||
|
}
|
||||||
|
|
||||||
private Long getRootProcedureId(Procedure proc) {
|
private Long getRootProcedureId(Procedure proc) {
|
||||||
return Procedure.getRootProcedureId(procedures, proc);
|
return Procedure.getRootProcedureId(procedures, proc);
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachinePr
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class StateMachineProcedure<TEnvironment, TState>
|
public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
extends Procedure<TEnvironment> {
|
extends Procedure<TEnvironment> {
|
||||||
|
private Flow stateFlow = Flow.HAS_MORE_STATE;
|
||||||
private int stateCount = 0;
|
private int stateCount = 0;
|
||||||
private int[] states = null;
|
private int[] states = null;
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
* Flow.HAS_MORE_STATE if there is another step.
|
* Flow.HAS_MORE_STATE if there is another step.
|
||||||
*/
|
*/
|
||||||
protected abstract Flow executeFromState(TEnvironment env, TState state)
|
protected abstract Flow executeFromState(TEnvironment env, TState state)
|
||||||
throws ProcedureYieldException, InterruptedException;
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* called to perform the rollback of the specified state
|
* called to perform the rollback of the specified state
|
||||||
|
@ -114,26 +115,28 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
* Add a child procedure to execute
|
* Add a child procedure to execute
|
||||||
* @param subProcedure the child procedure
|
* @param subProcedure the child procedure
|
||||||
*/
|
*/
|
||||||
protected void addChildProcedure(Procedure subProcedure) {
|
protected void addChildProcedure(Procedure... subProcedure) {
|
||||||
if (subProcList == null) {
|
if (subProcList == null) {
|
||||||
subProcList = new ArrayList<Procedure>();
|
subProcList = new ArrayList<Procedure>(subProcedure.length);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < subProcedure.length; ++i) {
|
||||||
|
subProcList.add(subProcedure[i]);
|
||||||
}
|
}
|
||||||
subProcList.add(subProcedure);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Procedure[] execute(final TEnvironment env)
|
protected Procedure[] execute(final TEnvironment env)
|
||||||
throws ProcedureYieldException, InterruptedException {
|
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||||
updateTimestamp();
|
updateTimestamp();
|
||||||
try {
|
try {
|
||||||
|
if (!hasMoreState()) return null;
|
||||||
|
|
||||||
TState state = getCurrentState();
|
TState state = getCurrentState();
|
||||||
if (stateCount == 0) {
|
if (stateCount == 0) {
|
||||||
setNextState(getStateId(state));
|
setNextState(getStateId(state));
|
||||||
}
|
}
|
||||||
if (executeFromState(env, state) == Flow.NO_MORE_STATE) {
|
|
||||||
// completed
|
stateFlow = executeFromState(env, state);
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subProcList != null && subProcList.size() != 0) {
|
if (subProcList != null && subProcList.size() != 0) {
|
||||||
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
|
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
|
||||||
|
@ -141,7 +144,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
return subProcedures;
|
return subProcedures;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (isWaiting() || isFailed()) ? null : new Procedure[] {this};
|
return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
|
||||||
} finally {
|
} finally {
|
||||||
updateTimestamp();
|
updateTimestamp();
|
||||||
}
|
}
|
||||||
|
@ -164,6 +167,10 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
||||||
return isYieldBeforeExecuteFromState(env, getCurrentState());
|
return isYieldBeforeExecuteFromState(env, getCurrentState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean hasMoreState() {
|
||||||
|
return stateFlow != Flow.NO_MORE_STATE;
|
||||||
|
}
|
||||||
|
|
||||||
private TState getCurrentState() {
|
private TState getCurrentState() {
|
||||||
return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
|
return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
|
||||||
}
|
}
|
||||||
|
|
|
@ -356,8 +356,7 @@ public class TestProcedureRecovery {
|
||||||
case STATE_2:
|
case STATE_2:
|
||||||
LOG.info("execute step 2 " + this);
|
LOG.info("execute step 2 " + this);
|
||||||
if (submitChildProc) {
|
if (submitChildProc) {
|
||||||
addChildProcedure(new TestStateMachineProcedure());
|
addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
|
||||||
addChildProcedure(new TestStateMachineProcedure());
|
|
||||||
setNextState(State.DONE);
|
setNextState(State.DONE);
|
||||||
} else {
|
} else {
|
||||||
setNextState(State.STATE_3);
|
setNextState(State.STATE_3);
|
||||||
|
@ -376,6 +375,10 @@ public class TestProcedureRecovery {
|
||||||
iResult += 7;
|
iResult += 7;
|
||||||
break;
|
break;
|
||||||
case DONE:
|
case DONE:
|
||||||
|
if (submitChildProc) {
|
||||||
|
addChildProcedure(new TestStateMachineProcedure());
|
||||||
|
}
|
||||||
|
iResult += 11;
|
||||||
setResult(Bytes.toBytes(iResult));
|
setResult(Bytes.toBytes(iResult));
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
default:
|
default:
|
||||||
|
@ -442,7 +445,10 @@ public class TestProcedureRecovery {
|
||||||
long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
|
long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
|
||||||
// Wait the completion
|
// Wait the completion
|
||||||
ProcedureTestingUtility.waitProcedure(procExecutor, procId);
|
ProcedureTestingUtility.waitProcedure(procExecutor, procId);
|
||||||
ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
|
ProcedureInfo result = procExecutor.getResult(procId);
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(result);
|
||||||
|
assertEquals(19, Bytes.toInt(result.getResult()));
|
||||||
|
assertEquals(4, procExecutor.getLastProcId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
|
@ -480,7 +486,7 @@ public class TestProcedureRecovery {
|
||||||
// The procedure is completed
|
// The procedure is completed
|
||||||
ProcedureInfo result = procExecutor.getResult(procId);
|
ProcedureInfo result = procExecutor.getResult(procId);
|
||||||
ProcedureTestingUtility.assertProcNotFailed(result);
|
ProcedureTestingUtility.assertProcNotFailed(result);
|
||||||
assertEquals(15, Bytes.toInt(result.getResult()));
|
assertEquals(26, Bytes.toInt(result.getResult()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
|
|
Loading…
Reference in New Issue