From 60b81dc84859901881918d7dc686bf210e20989b Mon Sep 17 00:00:00 2001 From: Stephen Yuan Jiang Date: Tue, 1 Mar 2016 19:37:56 -0800 Subject: [PATCH] HBASE-15371: Procedure V2 - Completed support parent-child procedure (Stephen Yuan Jiang) --- .../procedure2/StateMachineProcedure.java | 21 ++++++++++++++ .../procedure2/TestProcedureRecovery.java | 28 ++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index 286a7bf8e83..c2b4548249d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Arrays; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -45,6 +46,8 @@ public abstract class StateMachineProcedure private int stateCount = 0; private int[] states = null; + private ArrayList subProcList = null; + protected enum Flow { HAS_MORE_STATE, NO_MORE_STATE, @@ -107,6 +110,17 @@ public abstract class StateMachineProcedure return false; } + /** + * Add a child procedure to execute + * @param subProcedure the child procedure + */ + protected void addChildProcedure(Procedure subProcedure) { + if (subProcList == null) { + subProcList = new ArrayList(); + } + subProcList.add(subProcedure); + } + @Override protected Procedure[] execute(final TEnvironment env) throws ProcedureYieldException, InterruptedException { @@ -120,6 +134,13 @@ public abstract class StateMachineProcedure // completed return null; } + + if (subProcList != null && subProcList.size() != 0) { + Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); + subProcList = null; + return subProcedures; + } + return (isWaiting() || isFailed()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index cae10ef7c60..18df3988bc6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -337,8 +337,13 @@ public class TestProcedureRecovery { public TestStateMachineProcedure() {} + public TestStateMachineProcedure(final boolean testSubmitChildProc) { + this.submitChildProc = testSubmitChildProc; + } + private AtomicBoolean aborted = new AtomicBoolean(false); private int iResult = 0; + private boolean submitChildProc = false; @Override protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { @@ -350,7 +355,13 @@ public class TestProcedureRecovery { break; case STATE_2: LOG.info("execute step 2 " + this); - setNextState(State.STATE_3); + if (submitChildProc) { + addChildProcedure(new TestStateMachineProcedure()); + addChildProcedure(new TestStateMachineProcedure()); + setNextState(State.DONE); + } else { + setNextState(State.STATE_3); + } iResult += 5; break; case STATE_3: @@ -363,6 +374,8 @@ public class TestProcedureRecovery { } setNextState(State.DONE); iResult += 7; + break; + case DONE: setResult(Bytes.toBytes(iResult)); return Flow.NO_MORE_STATE; default: @@ -424,6 +437,14 @@ public class TestProcedureRecovery { } } + @Test(timeout=30000) + public void testStateMachineMultipleLevel() throws Exception { + long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); + // Wait the completion + ProcedureTestingUtility.waitProcedure(procExecutor, procId); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); + } + @Test(timeout=30000) public void testStateMachineRecovery() throws Exception { ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); @@ -447,6 +468,11 @@ public class TestProcedureRecovery { assertFalse(procExecutor.isRunning()); // Step 3 exec + restart(); + waitProcedure(procId); + ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); + assertFalse(procExecutor.isRunning()); + restart(); waitProcedure(procId); assertTrue(procExecutor.isRunning());