HBASE-16871 Procedure v2 - add waiting procs back to the queue after restart

This commit is contained in:
Matteo Bertozzi 2016-10-20 07:17:10 -07:00
parent 59857a41ea
commit 553373671b
3 changed files with 85 additions and 15 deletions

View File

@ -82,6 +82,7 @@ public class ProcedureExecutor<TEnvironment> {
Testing testing = null; Testing testing = null;
public static class Testing { public static class Testing {
protected boolean killIfSuspended = false;
protected boolean killBeforeStoreUpdate = false; protected boolean killBeforeStoreUpdate = false;
protected boolean toggleKillBeforeStoreUpdate = false; protected boolean toggleKillBeforeStoreUpdate = false;
@ -93,6 +94,10 @@ public class ProcedureExecutor<TEnvironment> {
} }
return kill; return kill;
} }
protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
}
} }
public interface ProcedureExecutorListener { public interface ProcedureExecutorListener {
@ -343,7 +348,7 @@ public class ProcedureExecutor<TEnvironment> {
} }
// 2. Initialize the stacks // 2. Initialize the stacks
ArrayList<Procedure> runnableList = new ArrayList(runnablesCount); final ArrayList<Procedure> runnableList = new ArrayList(runnablesCount);
HashSet<Procedure> waitingSet = null; HashSet<Procedure> waitingSet = null;
procIter.reset(); procIter.reset();
while (procIter.hasNext()) { while (procIter.hasNext()) {
@ -432,7 +437,15 @@ public class ProcedureExecutor<TEnvironment> {
throw new IOException("found " + corruptedCount + " procedures on replay"); throw new IOException("found " + corruptedCount + " procedures on replay");
} }
// 4. Push the scheduler // 4. Push the procedures to the timeout executor
if (waitingSet != null && !waitingSet.isEmpty()) {
for (Procedure proc: waitingSet) {
proc.afterReplay(getEnvironment());
timeoutExecutor.add(proc);
}
}
// 5. Push the procedure to the scheduler
if (!runnableList.isEmpty()) { if (!runnableList.isEmpty()) {
// TODO: See ProcedureWALFormatReader#hasFastStartSupport // TODO: See ProcedureWALFormatReader#hasFastStartSupport
// some procedure may be started way before this stuff. // some procedure may be started way before this stuff.
@ -1192,7 +1205,7 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal. // allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery. // useful to test the procedure recovery.
if (testing != null && !isSuspended && testing.shouldKillBeforeStoreUpdate()) { if (testing != null && testing.shouldKillBeforeStoreUpdate(isSuspended)) {
LOG.debug("TESTING: Kill before store update: " + procedure); LOG.debug("TESTING: Kill before store update: " + procedure);
stop(); stop();
return; return;

View File

@ -108,11 +108,21 @@ public class ProcedureTestingUtility {
return loader; return loader;
} }
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
boolean value) {
if (procExecutor.testing == null) { if (procExecutor.testing == null) {
procExecutor.testing = new ProcedureExecutor.Testing(); procExecutor.testing = new ProcedureExecutor.Testing();
} }
}
public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.killIfSuspended = value;
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.killBeforeStoreUpdate = value; procExecutor.testing.killBeforeStoreUpdate = value;
LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
assertSingleExecutorForKillTests(procExecutor); assertSingleExecutorForKillTests(procExecutor);
@ -120,17 +130,13 @@ public class ProcedureTestingUtility {
public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
boolean value) { boolean value) {
if (procExecutor.testing == null) { createExecutorTesting(procExecutor);
procExecutor.testing = new ProcedureExecutor.Testing();
}
procExecutor.testing.toggleKillBeforeStoreUpdate = value; procExecutor.testing.toggleKillBeforeStoreUpdate = value;
assertSingleExecutorForKillTests(procExecutor); assertSingleExecutorForKillTests(procExecutor);
} }
public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
if (procExecutor.testing == null) { createExecutorTesting(procExecutor);
procExecutor.testing = new ProcedureExecutor.Testing();
}
procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
assertSingleExecutorForKillTests(procExecutor); assertSingleExecutorForKillTests(procExecutor);

View File

@ -19,14 +19,19 @@
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -45,17 +50,22 @@ public class TestProcedureEvents {
private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class);
private TestProcEnv procEnv; private TestProcEnv procEnv;
private NoopProcedureStore procStore; private ProcedureStore procStore;
private ProcedureExecutor<TestProcEnv> procExecutor; private ProcedureExecutor<TestProcEnv> procExecutor;
private HBaseCommonTestingUtility htu; private HBaseCommonTestingUtility htu;
private FileSystem fs;
private Path logDir;
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility(); htu = new HBaseCommonTestingUtility();
Path testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(htu.getConfiguration());
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv(); procEnv = new TestProcEnv();
procStore = new NoopProcedureStore(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
procStore.start(1); procStore.start(1);
procExecutor.start(1, true); procExecutor.start(1, true);
@ -66,13 +76,14 @@ public class TestProcedureEvents {
procExecutor.stop(); procExecutor.stop();
procStore.stop(false); procStore.stop(false);
procExecutor.join(); procExecutor.join();
fs.delete(logDir, true);
} }
@Test(timeout=30000) @Test(timeout=30000)
public void testTimeoutEventProcedure() throws Exception { public void testTimeoutEventProcedure() throws Exception {
final int NTIMEOUTS = 5; final int NTIMEOUTS = 5;
TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS); TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS);
procExecutor.submitProcedure(proc); procExecutor.submitProcedure(proc);
ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
@ -80,6 +91,26 @@ public class TestProcedureEvents {
assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
} }
@Test(timeout=30000)
public void testTimeoutEventProcedureDoubleExecution() throws Exception {
testTimeoutEventProcedureDoubleExecution(false);
}
@Test(timeout=30000)
public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception {
testTimeoutEventProcedureDoubleExecution(true);
}
private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended)
throws Exception {
TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3);
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended);
long procId = procExecutor.submitProcedure(proc);
ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true);
ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
}
public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> { public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
private final ProcedureEvent event = new ProcedureEvent("timeout-event"); private final ProcedureEvent event = new ProcedureEvent("timeout-event");
@ -122,6 +153,26 @@ public class TestProcedureEvents {
env.getProcedureScheduler().wakeEvent(event); env.getProcedureScheduler().wakeEvent(event);
return false; return false;
} }
@Override
protected void afterReplay(final TestProcEnv env) {
if (getState() == ProcedureState.WAITING_TIMEOUT) {
env.getProcedureScheduler().suspendEvent(event);
env.getProcedureScheduler().waitEvent(event, this);
}
}
@Override
protected void serializeStateData(final OutputStream stream) throws IOException {
StreamUtils.writeRawVInt32(stream, ntimeouts.get());
StreamUtils.writeRawVInt32(stream, maxTimeouts);
}
@Override
protected void deserializeStateData(final InputStream stream) throws IOException {
ntimeouts.set(StreamUtils.readRawVarint32(stream));
maxTimeouts = StreamUtils.readRawVarint32(stream);
}
} }
private class TestProcEnv { private class TestProcEnv {