diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 24a448e363d..1a4845cf731 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,7 +54,8 @@ public class TestProcedureRecovery { private static final int PROCEDURE_EXECUTOR_SLOTS = 1; private static final Procedure NULL_PROC = null; - private static ProcedureExecutor procExecutor; + private static TestProcEnv procEnv; + private static ProcedureExecutor procExecutor; private static ProcedureStore procStore; private static int procSleepInterval; @@ -70,15 +72,13 @@ public class TestProcedureRecovery { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; - - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false); } @After @@ -94,13 +94,14 @@ public class TestProcedureRecovery { dumpLogDirState(); } - public static class TestSingleStepProcedure extends SequentialProcedure { + public static class TestSingleStepProcedure extends SequentialProcedure { private int step = 0; public TestSingleStepProcedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); step++; setResult(Bytes.toBytes(step)); @@ -108,18 +109,19 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { } + protected void rollback(TestProcEnv env) { } @Override - protected boolean abort(Void env) { return true; } + protected boolean abort(TestProcEnv env) { return true; } } - public static class BaseTestStepProcedure extends SequentialProcedure { + public static class BaseTestStepProcedure extends SequentialProcedure { private AtomicBoolean abort = new AtomicBoolean(false); private int step = 0; @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; @@ -134,14 +136,14 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { + protected void rollback(TestProcEnv env) { LOG.debug("rollback procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { abort.set(true); return true; } @@ -161,7 +163,7 @@ public class TestProcedureRecovery { public TestMultiStepProcedure() { } @Override - public Procedure[] execute(Void env) { + public Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step1Procedure() }; } @@ -170,7 +172,7 @@ public class TestProcedureRecovery { public Step1Procedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step2Procedure() }; } @@ -298,6 +300,8 @@ public class TestProcedureRecovery { // Restart restart(); + waitProcedure(procId); + Procedure proc2 = new TestSingleStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); @@ -314,17 +318,23 @@ public class TestProcedureRecovery { Procedure proc = new TestMultiStepProcedure(); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - // Restart + // Restart (use a latch to prevent the step execution until we submitted proc2) + CountDownLatch latch = new CountDownLatch(1); + procEnv.setWaitLatch(latch); restart(); - Procedure proc2 = new TestMultiStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); + Procedure proc2 = new TestMultiStepProcedure(); + long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); + latch.countDown(); + procEnv.setWaitLatch(null); + // The original proc is not completed and the new submission should have the same proc Id. assertTrue(procId == procId2); } + public static class TestStateMachineProcedure - extends StateMachineProcedure { + extends StateMachineProcedure { enum State { STATE_1, STATE_2, STATE_3, DONE } public TestStateMachineProcedure() {} @@ -333,7 +343,7 @@ public class TestProcedureRecovery { private int iResult = 0; @Override - protected StateMachineProcedure.Flow executeFromState(Void env, State state) { + protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { switch (state) { case STATE_1: LOG.info("execute step 1 " + this); @@ -364,7 +374,7 @@ public class TestProcedureRecovery { } @Override - protected void rollbackState(Void env, final State state) { + protected void rollbackState(TestProcEnv env, final State state) { switch (state) { case STATE_1: LOG.info("rollback step 1 " + this); @@ -396,7 +406,7 @@ public class TestProcedureRecovery { } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { aborted.set(true); return true; } @@ -522,4 +532,21 @@ public class TestProcedureRecovery { LOG.warn("Unable to dump " + logDir, e); } } + + private static class TestProcEnv { + private CountDownLatch latch = null; + + /** + * set/unset a latch. every procedure execute() step will wait on the latch if any. + */ + public void setWaitLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void waitOnLatch() throws InterruptedException { + if (latch != null) { + latch.await(); + } + } + } } \ No newline at end of file