HBASE-14106 TestProcedureRecovery is flaky

This commit is contained in:
Matteo Bertozzi 2015-07-17 10:20:33 -07:00
parent b98598f363
commit 7382f8e045
1 changed files with 49 additions and 22 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -53,7 +54,8 @@ public class TestProcedureRecovery {
private static final int PROCEDURE_EXECUTOR_SLOTS = 1; private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
private static final Procedure NULL_PROC = null; private static final Procedure NULL_PROC = null;
private static ProcedureExecutor<Void> procExecutor; private static TestProcEnv procEnv;
private static ProcedureExecutor<TestProcEnv> procExecutor;
private static ProcedureStore procStore; private static ProcedureStore procStore;
private static int procSleepInterval; private static int procSleepInterval;
@ -70,15 +72,13 @@ public class TestProcedureRecovery {
assertTrue(testDir.depth() > 1); assertTrue(testDir.depth() > 1);
logDir = new Path(testDir, "proc-logs"); logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS); procStore.start(PROCEDURE_EXECUTOR_SLOTS);
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0; procSleepInterval = 0;
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
} }
@After @After
@ -94,13 +94,14 @@ public class TestProcedureRecovery {
dumpLogDirState(); dumpLogDirState();
} }
public static class TestSingleStepProcedure extends SequentialProcedure<Void> { public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
private int step = 0; private int step = 0;
public TestSingleStepProcedure() { } public TestSingleStepProcedure() { }
@Override @Override
protected Procedure[] execute(Void env) { protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step); LOG.debug("execute procedure " + this + " step=" + step);
step++; step++;
setResult(Bytes.toBytes(step)); setResult(Bytes.toBytes(step));
@ -108,18 +109,19 @@ public class TestProcedureRecovery {
} }
@Override @Override
protected void rollback(Void env) { } protected void rollback(TestProcEnv env) { }
@Override @Override
protected boolean abort(Void env) { return true; } protected boolean abort(TestProcEnv env) { return true; }
} }
public static class BaseTestStepProcedure extends SequentialProcedure<Void> { public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
private AtomicBoolean abort = new AtomicBoolean(false); private AtomicBoolean abort = new AtomicBoolean(false);
private int step = 0; private int step = 0;
@Override @Override
protected Procedure[] execute(Void env) { protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
env.waitOnLatch();
LOG.debug("execute procedure " + this + " step=" + step); LOG.debug("execute procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++; step++;
@ -134,14 +136,14 @@ public class TestProcedureRecovery {
} }
@Override @Override
protected void rollback(Void env) { protected void rollback(TestProcEnv env) {
LOG.debug("rollback procedure " + this + " step=" + step); LOG.debug("rollback procedure " + this + " step=" + step);
ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
step++; step++;
} }
@Override @Override
protected boolean abort(Void env) { protected boolean abort(TestProcEnv env) {
abort.set(true); abort.set(true);
return true; return true;
} }
@ -161,7 +163,7 @@ public class TestProcedureRecovery {
public TestMultiStepProcedure() { } public TestMultiStepProcedure() { }
@Override @Override
public Procedure[] execute(Void env) { public Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env); super.execute(env);
return isFailed() ? null : new Procedure[] { new Step1Procedure() }; return isFailed() ? null : new Procedure[] { new Step1Procedure() };
} }
@ -170,7 +172,7 @@ public class TestProcedureRecovery {
public Step1Procedure() { } public Step1Procedure() { }
@Override @Override
protected Procedure[] execute(Void env) { protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
super.execute(env); super.execute(env);
return isFailed() ? null : new Procedure[] { new Step2Procedure() }; return isFailed() ? null : new Procedure[] { new Step2Procedure() };
} }
@ -298,6 +300,8 @@ public class TestProcedureRecovery {
// Restart // Restart
restart(); restart();
waitProcedure(procId);
Procedure proc2 = new TestSingleStepProcedure(); Procedure proc2 = new TestSingleStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return. // Submit a procedure with the same nonce and expect the same procedure would return.
long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
@ -314,17 +318,23 @@ public class TestProcedureRecovery {
Procedure proc = new TestMultiStepProcedure(); Procedure proc = new TestMultiStepProcedure();
long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
// Restart // Restart (use a latch to prevent the step execution until we submitted proc2)
CountDownLatch latch = new CountDownLatch(1);
procEnv.setWaitLatch(latch);
restart(); restart();
Procedure proc2 = new TestMultiStepProcedure();
// Submit a procedure with the same nonce and expect the same procedure would return. // Submit a procedure with the same nonce and expect the same procedure would return.
long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); Procedure proc2 = new TestMultiStepProcedure();
long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
latch.countDown();
procEnv.setWaitLatch(null);
// The original proc is not completed and the new submission should have the same proc Id. // The original proc is not completed and the new submission should have the same proc Id.
assertTrue(procId == procId2); assertTrue(procId == procId2);
} }
public static class TestStateMachineProcedure public static class TestStateMachineProcedure
extends StateMachineProcedure<Void, TestStateMachineProcedure.State> { extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
enum State { STATE_1, STATE_2, STATE_3, DONE } enum State { STATE_1, STATE_2, STATE_3, DONE }
public TestStateMachineProcedure() {} public TestStateMachineProcedure() {}
@ -333,7 +343,7 @@ public class TestProcedureRecovery {
private int iResult = 0; private int iResult = 0;
@Override @Override
protected StateMachineProcedure.Flow executeFromState(Void env, State state) { protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
switch (state) { switch (state) {
case STATE_1: case STATE_1:
LOG.info("execute step 1 " + this); LOG.info("execute step 1 " + this);
@ -364,7 +374,7 @@ public class TestProcedureRecovery {
} }
@Override @Override
protected void rollbackState(Void env, final State state) { protected void rollbackState(TestProcEnv env, final State state) {
switch (state) { switch (state) {
case STATE_1: case STATE_1:
LOG.info("rollback step 1 " + this); LOG.info("rollback step 1 " + this);
@ -396,7 +406,7 @@ public class TestProcedureRecovery {
} }
@Override @Override
protected boolean abort(Void env) { protected boolean abort(TestProcEnv env) {
aborted.set(true); aborted.set(true);
return true; return true;
} }
@ -522,4 +532,21 @@ public class TestProcedureRecovery {
LOG.warn("Unable to dump " + logDir, e); LOG.warn("Unable to dump " + logDir, e);
} }
} }
private static class TestProcEnv {
private CountDownLatch latch = null;
/**
* set/unset a latch. every procedure execute() step will wait on the latch if any.
*/
public void setWaitLatch(CountDownLatch latch) {
this.latch = latch;
}
public void waitOnLatch() throws InterruptedException {
if (latch != null) {
latch.await();
}
}
}
} }