HBASE-14106 TestProcedureRecovery is flaky

This commit is contained in:
Matteo Bertozzi 2015-07-17 10:20:33 -07:00
parent 440e0bc6ea
commit 9135ad1bed
1 changed files with 49 additions and 22 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -52,7 +53,8 @@ public class TestProcedureRecovery {
private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static final Procedure NULL_PROC = null;
private static ProcedureExecutor<Void> procExecutor;
private static TestProcEnv procEnv;
private static ProcedureExecutor<TestProcEnv> procExecutor;
private static ProcedureStore procStore;
private static int procSleepInterval;
@ -69,15 +71,13 @@ public class TestProcedureRecovery {
assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0;
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
}
@After
@ -93,13 +93,14 @@ public class TestProcedureRecovery {
dumpLogDirState();
}
public static class TestSingleStepProcedure extends SequentialProcedure<Void> {
public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
private int step = 0;
public TestSingleStepProcedure() { }
@Override
protected Procedure[] execute(Void env) {
protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step);
step++;
setResult(Bytes.toBytes(step));
@ -107,18 +108,19 @@ public class TestProcedureRecovery {
}
@Override
protected void rollback(Void env) { }
protected void rollback(TestProcEnv env) { }
@Override
protected boolean abort(Void env) { return true; }
protected boolean abort(TestProcEnv env) { return true; }
}
public static class BaseTestStepProcedure extends SequentialProcedure<Void> {
public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
private AtomicBoolean abort = new AtomicBoolean(false);
private int step = 0;
@Override
protected Procedure[] execute(Void env) {
protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++;
@ -133,14 +135,14 @@ public class TestProcedureRecovery {
}
@Override
protected void rollback(Void env) {
protected void rollback(TestProcEnv env) {
LOG.debug("rollback procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++;
}
@Override
protected boolean abort(Void env) {
protected boolean abort(TestProcEnv env) {
abort.set(true);
return true;
}
@ -160,7 +162,7 @@ public class TestProcedureRecovery {
public TestMultiStepProcedure() { }
@Override
public Procedure[] execute(Void env) {
public Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env);
return isFailed() ? null : new Procedure[] { new Step1Procedure() };
}
@ -169,7 +171,7 @@ public class TestProcedureRecovery {
public Step1Procedure() { }
@Override
protected Procedure[] execute(Void env) {
protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env);
return isFailed() ? null : new Procedure[] { new Step2Procedure() };
}
@ -297,6 +299,8 @@ public class TestProcedureRecovery {
// Restart
restart();
waitProcedure(procId);
Procedure proc2 = new TestSingleStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return.
long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@ -313,17 +317,23 @@ public class TestProcedureRecovery {
Procedure proc = new TestMultiStepProcedure();
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
// Restart
// Restart (use a latch to prevent the step execution until we submitted proc2)
CountDownLatch latch = new CountDownLatch(1);
procEnv.setWaitLatch(latch);
restart();
Procedure proc2 = new TestMultiStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return.
long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
Procedure proc2 = new TestMultiStepProcedure();
long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
latch.countDown();
procEnv.setWaitLatch(null);
// The original proc is not completed and the new submission should have the same proc Id.
assertTrue(procId == procId2);
}
public static class TestStateMachineProcedure
extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
enum State { STATE_1, STATE_2, STATE_3, DONE }
public TestStateMachineProcedure() {}
@ -332,7 +342,7 @@ public class TestProcedureRecovery {
private int iResult = 0;
@Override
protected StateMachineProcedure.Flow executeFromState(Void env, State state) {
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
switch (state) {
case STATE_1:
LOG.info("execute step 1 " + this);
@ -363,7 +373,7 @@ public class TestProcedureRecovery {
}
@Override
protected void rollbackState(Void env, final State state) {
protected void rollbackState(TestProcEnv env, final State state) {
switch (state) {
case STATE_1:
LOG.info("rollback step 1 " + this);
@ -395,7 +405,7 @@ public class TestProcedureRecovery {
}
@Override
protected boolean abort(Void env) {
protected boolean abort(TestProcEnv env) {
aborted.set(true);
return true;
}
@ -521,4 +531,21 @@ public class TestProcedureRecovery {
LOG.warn("Unable to dump " + logDir, e);
}
}
private static class TestProcEnv {
private CountDownLatch latch = null;
/**
* set/unset a latch. every procedure execute() step will wait on the latch if any.
*/
public void setWaitLatch(CountDownLatch latch) {
this.latch = latch;
}
public void waitOnLatch() throws InterruptedException {
if (latch != null) {
latch.await();
}
}
}
}