From 91fee265cfdd59f49fc0e3132af74bdae58f6f8a Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Tue, 23 Aug 2016 16:42:57 -0700 Subject: [PATCH] HBASE-16485 Procedure v2 - Add support to addChildProcedure() as last "step" in StateMachineProcedure --- .../hbase/procedure2/ProcedureExecutor.java | 6 +++++ .../procedure2/StateMachineProcedure.java | 27 ++++++++++++------- .../procedure2/TestProcedureRecovery.java | 14 +++++++--- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 198623d8cb6..f71426034ad 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure2; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; @@ -1290,6 +1291,11 @@ public class ProcedureExecutor { return procId; } + @VisibleForTesting + protected long getLastProcId() { + return lastProcId.get(); + } + private Long getRootProcedureId(Procedure proc) { return Procedure.getRootProcedureId(procedures, proc); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index c2b4548249d..a403193c3de 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.StateMachinePr @InterfaceStability.Evolving public abstract class StateMachineProcedure extends Procedure { + private Flow stateFlow = Flow.HAS_MORE_STATE; private int stateCount = 0; private int[] states = null; @@ -60,7 +61,7 @@ public abstract class StateMachineProcedure * Flow.HAS_MORE_STATE if there is another step. */ protected abstract Flow executeFromState(TEnvironment env, TState state) - throws ProcedureYieldException, InterruptedException; + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; /** * called to perform the rollback of the specified state @@ -114,26 +115,28 @@ public abstract class StateMachineProcedure * Add a child procedure to execute * @param subProcedure the child procedure */ - protected void addChildProcedure(Procedure subProcedure) { + protected void addChildProcedure(Procedure... subProcedure) { if (subProcList == null) { - subProcList = new ArrayList(); + subProcList = new ArrayList(subProcedure.length); + } + for (int i = 0; i < subProcedure.length; ++i) { + subProcList.add(subProcedure[i]); } - subProcList.add(subProcedure); } @Override protected Procedure[] execute(final TEnvironment env) - throws ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { updateTimestamp(); try { + if (!hasMoreState()) return null; + TState state = getCurrentState(); if (stateCount == 0) { setNextState(getStateId(state)); } - if (executeFromState(env, state) == Flow.NO_MORE_STATE) { - // completed - return null; - } + + stateFlow = executeFromState(env, state); if (subProcList != null && subProcList.size() != 0) { Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); @@ -141,7 +144,7 @@ public abstract class StateMachineProcedure return subProcedures; } - return (isWaiting() || isFailed()) ? null : new Procedure[] {this}; + return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); } @@ -164,6 +167,10 @@ public abstract class StateMachineProcedure return isYieldBeforeExecuteFromState(env, getCurrentState()); } + private boolean hasMoreState() { + return stateFlow != Flow.NO_MORE_STATE; + } + private TState getCurrentState() { return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 18df3988bc6..e3cacd259db 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -356,8 +356,7 @@ public class TestProcedureRecovery { case STATE_2: LOG.info("execute step 2 " + this); if (submitChildProc) { - addChildProcedure(new TestStateMachineProcedure()); - addChildProcedure(new TestStateMachineProcedure()); + addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); setNextState(State.DONE); } else { setNextState(State.STATE_3); @@ -376,6 +375,10 @@ public class TestProcedureRecovery { iResult += 7; break; case DONE: + if (submitChildProc) { + addChildProcedure(new TestStateMachineProcedure()); + } + iResult += 11; setResult(Bytes.toBytes(iResult)); return Flow.NO_MORE_STATE; default: @@ -442,7 +445,10 @@ public class TestProcedureRecovery { long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); // Wait the completion ProcedureTestingUtility.waitProcedure(procExecutor, procId); - ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + ProcedureInfo result = procExecutor.getResult(procId); + ProcedureTestingUtility.assertProcNotFailed(result); + assertEquals(19, Bytes.toInt(result.getResult())); + assertEquals(4, procExecutor.getLastProcId()); } @Test(timeout=30000) @@ -480,7 +486,7 @@ public class TestProcedureRecovery { // The procedure is completed ProcedureInfo result = procExecutor.getResult(procId); ProcedureTestingUtility.assertProcNotFailed(result); - assertEquals(15, Bytes.toInt(result.getResult())); + assertEquals(26, Bytes.toInt(result.getResult())); } @Test(timeout=30000)