HBASE-15422 Procedure v2 - Avoid double yield
This commit is contained in:
parent
7e4e8dc3e9
commit
648cc5e823
|
@ -876,12 +876,6 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
runnables.yield(proc);
|
||||
break;
|
||||
}
|
||||
} while (procStack.isFailed());
|
||||
}
|
||||
|
||||
|
@ -1159,7 +1153,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
|
||||
// if the procedure is kind enough to pass the slot to someone else, yield
|
||||
if (reExecute && procedure.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
if (procedure.getState() == ProcedureState.RUNNABLE &&
|
||||
procedure.isYieldAfterExecutionStep(getEnvironment())) {
|
||||
runnables.yield(procedure);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -48,6 +50,7 @@ public class TestYieldProcedures {
|
|||
private static final Procedure NULL_PROC = null;
|
||||
|
||||
private ProcedureExecutor<TestProcEnv> procExecutor;
|
||||
private TestRunQueue procRunnables;
|
||||
private ProcedureStore procStore;
|
||||
|
||||
private HBaseCommonTestingUtility htu;
|
||||
|
@ -64,7 +67,9 @@ public class TestYieldProcedures {
|
|||
|
||||
logDir = new Path(testDir, "proc-logs");
|
||||
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
|
||||
procRunnables = new TestRunQueue();
|
||||
procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
|
||||
procStore, procRunnables);
|
||||
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
|
||||
procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
|
||||
}
|
||||
|
@ -87,34 +92,32 @@ public class TestYieldProcedures {
|
|||
}
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
|
||||
|
||||
// verify yield during execute()
|
||||
long prevTimestamp = 0;
|
||||
for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(execStep);
|
||||
LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
|
||||
|
||||
// verify execution
|
||||
int index = 0;
|
||||
for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
|
||||
assertEquals(false, info.isRollback());
|
||||
assertEquals(execStep, info.getStep().ordinal());
|
||||
assertEquals(prevTimestamp + 1, info.getTimestamp());
|
||||
prevTimestamp++;
|
||||
}
|
||||
|
||||
// verify rollback
|
||||
for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
|
||||
assertEquals(true, info.isRollback());
|
||||
assertEquals(execStep, info.getStep().ordinal());
|
||||
}
|
||||
}
|
||||
|
||||
// verify yield during rollback()
|
||||
int count = NUM_STATES;
|
||||
for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
|
||||
for (int i = 0; i < procs.length; ++i) {
|
||||
assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
|
||||
TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(count);
|
||||
LOG.info("i=" + i + " execStep=" + execStep + " timestamp=" + info.getTimestamp());
|
||||
assertEquals(true, info.isRollback());
|
||||
assertEquals(execStep, info.getStep().ordinal());
|
||||
assertEquals(prevTimestamp + 1, info.getTimestamp());
|
||||
prevTimestamp++;
|
||||
}
|
||||
count++;
|
||||
}
|
||||
// check runnable queue stats
|
||||
assertEquals(0, procRunnables.size());
|
||||
assertEquals(0, procRunnables.addFrontCalls);
|
||||
assertEquals(18, procRunnables.addBackCalls);
|
||||
assertEquals(15, procRunnables.yieldCalls);
|
||||
assertEquals(19, procRunnables.pollCalls);
|
||||
assertEquals(3, procRunnables.completionCalls);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -147,6 +150,29 @@ public class TestYieldProcedures {
|
|||
assertEquals(true, info.isRollback());
|
||||
assertEquals(i, info.getStep().ordinal());
|
||||
}
|
||||
|
||||
// check runnable queue stats
|
||||
assertEquals(0, procRunnables.size());
|
||||
assertEquals(0, procRunnables.addFrontCalls);
|
||||
assertEquals(12, procRunnables.addBackCalls);
|
||||
assertEquals(11, procRunnables.yieldCalls);
|
||||
assertEquals(13, procRunnables.pollCalls);
|
||||
assertEquals(1, procRunnables.completionCalls);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYieldException() {
|
||||
TestYieldProcedure proc = new TestYieldProcedure();
|
||||
ProcedureTestingUtility.submitAndWait(procExecutor, proc);
|
||||
assertEquals(6, proc.step);
|
||||
|
||||
// check runnable queue stats
|
||||
assertEquals(0, procRunnables.size());
|
||||
assertEquals(0, procRunnables.addFrontCalls);
|
||||
assertEquals(6, procRunnables.addBackCalls);
|
||||
assertEquals(5, procRunnables.yieldCalls);
|
||||
assertEquals(7, procRunnables.pollCalls);
|
||||
assertEquals(1, procRunnables.completionCalls);
|
||||
}
|
||||
|
||||
private static class TestProcEnv {
|
||||
|
@ -199,8 +225,9 @@ public class TestYieldProcedures {
|
|||
@Override
|
||||
protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
|
||||
throws InterruptedException {
|
||||
LOG.info("execute step " + state);
|
||||
executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, false));
|
||||
final long ts = env.nextTimestamp();
|
||||
LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
|
||||
executionInfo.add(new ExecutionInfo(ts, state, false));
|
||||
Thread.sleep(150);
|
||||
|
||||
if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
|
||||
|
@ -229,8 +256,9 @@ public class TestYieldProcedures {
|
|||
@Override
|
||||
protected void rollbackState(TestProcEnv env, final State state)
|
||||
throws InterruptedException {
|
||||
LOG.debug("rollback state " + state);
|
||||
executionInfo.add(new ExecutionInfo(env.nextTimestamp(), state, true));
|
||||
final long ts = env.nextTimestamp();
|
||||
LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
|
||||
executionInfo.add(new ExecutionInfo(ts, state, true));
|
||||
Thread.sleep(150);
|
||||
|
||||
if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
|
||||
|
@ -276,4 +304,80 @@ public class TestYieldProcedures {
|
|||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestYieldProcedure extends Procedure<TestProcEnv> {
|
||||
private int step = 0;
|
||||
|
||||
public TestYieldProcedure() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
|
||||
LOG.info("execute step " + step);
|
||||
if (step++ < 5) {
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(TestProcEnv env) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(final OutputStream stream) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(final InputStream stream) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestRunQueue extends ProcedureSimpleRunQueue {
|
||||
private int completionCalls;
|
||||
private int addFrontCalls;
|
||||
private int addBackCalls;
|
||||
private int yieldCalls;
|
||||
private int pollCalls;
|
||||
|
||||
public TestRunQueue() {}
|
||||
|
||||
public void addFront(final Procedure proc) {
|
||||
addFrontCalls++;
|
||||
super.addFront(proc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBack(final Procedure proc) {
|
||||
addBackCalls++;
|
||||
super.addBack(proc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void yield(final Procedure proc) {
|
||||
yieldCalls++;
|
||||
super.yield(proc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Procedure poll() {
|
||||
pollCalls++;
|
||||
return super.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completionCleanup(Procedure proc) {
|
||||
completionCalls++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue