diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index 145f7f3564e..74e4675045a 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,7 +53,8 @@ public class TestProcedureRecovery { private static final int PROCEDURE_EXECUTOR_SLOTS = 1; private static final Procedure NULL_PROC = null; - private static ProcedureExecutor procExecutor; + private static TestProcEnv procEnv; + private static ProcedureExecutor procExecutor; private static ProcedureStore procStore; private static int procSleepInterval; @@ -69,15 +71,13 @@ public class TestProcedureRecovery { assertTrue(testDir.depth() > 1); logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; - - ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false); - ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false); } @After @@ -93,13 +93,14 @@ public class TestProcedureRecovery { dumpLogDirState(); } - public static class TestSingleStepProcedure extends SequentialProcedure { + public static class TestSingleStepProcedure extends SequentialProcedure { private int step = 0; public TestSingleStepProcedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); step++; setResult(Bytes.toBytes(step)); @@ -107,18 +108,19 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { } + protected void rollback(TestProcEnv env) { } @Override - protected boolean abort(Void env) { return true; } + protected boolean abort(TestProcEnv env) { return true; } } - public static class BaseTestStepProcedure extends SequentialProcedure { + public static class BaseTestStepProcedure extends SequentialProcedure { private AtomicBoolean abort = new AtomicBoolean(false); private int step = 0; @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + env.waitOnLatch(); LOG.debug("execute procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; @@ -133,14 +135,14 @@ public class TestProcedureRecovery { } @Override - protected void rollback(Void env) { + protected void rollback(TestProcEnv env) { LOG.debug("rollback procedure " + this + " step=" + step); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); step++; } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { abort.set(true); return true; } @@ -160,7 +162,7 @@ public class TestProcedureRecovery { public TestMultiStepProcedure() { } @Override - public Procedure[] execute(Void env) { + public Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step1Procedure() }; } @@ -169,7 +171,7 @@ public class TestProcedureRecovery { public Step1Procedure() { } @Override - protected Procedure[] execute(Void env) { + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { super.execute(env); return isFailed() ? null : new Procedure[] { new Step2Procedure() }; } @@ -297,6 +299,8 @@ public class TestProcedureRecovery { // Restart restart(); + waitProcedure(procId); + Procedure proc2 = new TestSingleStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); @@ -313,17 +317,23 @@ public class TestProcedureRecovery { Procedure proc = new TestMultiStepProcedure(); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); - // Restart + // Restart (use a latch to prevent the step execution until we submitted proc2) + CountDownLatch latch = new CountDownLatch(1); + procEnv.setWaitLatch(latch); restart(); - Procedure proc2 = new TestMultiStepProcedure(); // Submit a procedure with the same nonce and expect the same procedure would return. - long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); + Procedure proc2 = new TestMultiStepProcedure(); + long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce); + latch.countDown(); + procEnv.setWaitLatch(null); + // The original proc is not completed and the new submission should have the same proc Id. assertTrue(procId == procId2); } + public static class TestStateMachineProcedure - extends StateMachineProcedure { + extends StateMachineProcedure { enum State { STATE_1, STATE_2, STATE_3, DONE } public TestStateMachineProcedure() {} @@ -332,7 +342,7 @@ public class TestProcedureRecovery { private int iResult = 0; @Override - protected StateMachineProcedure.Flow executeFromState(Void env, State state) { + protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { switch (state) { case STATE_1: LOG.info("execute step 1 " + this); @@ -363,7 +373,7 @@ public class TestProcedureRecovery { } @Override - protected void rollbackState(Void env, final State state) { + protected void rollbackState(TestProcEnv env, final State state) { switch (state) { case STATE_1: LOG.info("rollback step 1 " + this); @@ -395,7 +405,7 @@ public class TestProcedureRecovery { } @Override - protected boolean abort(Void env) { + protected boolean abort(TestProcEnv env) { aborted.set(true); return true; } @@ -521,4 +531,21 @@ public class TestProcedureRecovery { LOG.warn("Unable to dump " + logDir, e); } } + + private static class TestProcEnv { + private CountDownLatch latch = null; + + /** + * set/unset a latch. every procedure execute() step will wait on the latch if any. + */ + public void setWaitLatch(CountDownLatch latch) { + this.latch = latch; + } + + public void waitOnLatch() throws InterruptedException { + if (latch != null) { + latch.await(); + } + } + } }