diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index f167f4ab703..2e2dbdf1c12 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -82,6 +82,7 @@ public class ProcedureExecutor { Testing testing = null; public static class Testing { + protected boolean killIfSuspended = false; protected boolean killBeforeStoreUpdate = false; protected boolean toggleKillBeforeStoreUpdate = false; @@ -93,6 +94,10 @@ public class ProcedureExecutor { } return kill; } + + protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) { + return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate(); + } } public interface ProcedureExecutorListener { @@ -343,7 +348,7 @@ public class ProcedureExecutor { } // 2. Initialize the stacks - ArrayList runnableList = new ArrayList(runnablesCount); + final ArrayList runnableList = new ArrayList(runnablesCount); HashSet waitingSet = null; procIter.reset(); while (procIter.hasNext()) { @@ -432,7 +437,15 @@ public class ProcedureExecutor { throw new IOException("found " + corruptedCount + " procedures on replay"); } - // 4. Push the scheduler + // 4. Push the procedures to the timeout executor + if (waitingSet != null && !waitingSet.isEmpty()) { + for (Procedure proc: waitingSet) { + proc.afterReplay(getEnvironment()); + timeoutExecutor.add(proc); + } + } + + // 5. Push the procedure to the scheduler if (!runnableList.isEmpty()) { // TODO: See ProcedureWALFormatReader#hasFastStartSupport // some procedure may be started way before this stuff. @@ -1192,7 +1205,7 @@ public class ProcedureExecutor { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. - if (testing != null && !isSuspended && testing.shouldKillBeforeStoreUpdate()) { + if (testing != null && testing.shouldKillBeforeStoreUpdate(isSuspended)) { LOG.debug("TESTING: Kill before store update: " + procedure); stop(); return; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index 968ccbf92df..25d6e2e3030 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -108,11 +108,21 @@ public class ProcedureTestingUtility { return loader; } - public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, - boolean value) { + private static void createExecutorTesting(final ProcedureExecutor procExecutor) { if (procExecutor.testing == null) { procExecutor.testing = new ProcedureExecutor.Testing(); } + } + + public static void setKillIfSuspended(ProcedureExecutor procExecutor, + boolean value) { + createExecutorTesting(procExecutor); + procExecutor.testing.killIfSuspended = value; + } + + public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, + boolean value) { + createExecutorTesting(procExecutor); procExecutor.testing.killBeforeStoreUpdate = value; LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); assertSingleExecutorForKillTests(procExecutor); @@ -120,17 +130,13 @@ public class ProcedureTestingUtility { public static void setToggleKillBeforeStoreUpdate(ProcedureExecutor procExecutor, boolean value) { - if (procExecutor.testing == null) { - procExecutor.testing = new ProcedureExecutor.Testing(); - } + createExecutorTesting(procExecutor); procExecutor.testing.toggleKillBeforeStoreUpdate = value; assertSingleExecutorForKillTests(procExecutor); } public static void toggleKillBeforeStoreUpdate(ProcedureExecutor procExecutor) { - if (procExecutor.testing == null) { - procExecutor.testing = new ProcedureExecutor.Testing(); - } + createExecutorTesting(procExecutor); procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); assertSingleExecutorForKillTests(procExecutor); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index c4316466649..b81e0f90454 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -19,14 +19,19 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; -import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -45,17 +50,22 @@ public class TestProcedureEvents { private static final Log LOG = LogFactory.getLog(TestProcedureEvents.class); private TestProcEnv procEnv; - private NoopProcedureStore procStore; + private ProcedureStore procStore; private ProcedureExecutor procExecutor; private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path logDir; @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); + Path testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); - procStore = new NoopProcedureStore(); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir); procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); procStore.start(1); procExecutor.start(1, true); @@ -66,13 +76,14 @@ public class TestProcedureEvents { procExecutor.stop(); procStore.stop(false); procExecutor.join(); + fs.delete(logDir, true); } @Test(timeout=30000) public void testTimeoutEventProcedure() throws Exception { final int NTIMEOUTS = 5; - TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, NTIMEOUTS); + TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS); procExecutor.submitProcedure(proc); ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId()); @@ -80,6 +91,26 @@ public class TestProcedureEvents { assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount()); } + @Test(timeout=30000) + public void testTimeoutEventProcedureDoubleExecution() throws Exception { + testTimeoutEventProcedureDoubleExecution(false); + } + + @Test(timeout=30000) + public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception { + testTimeoutEventProcedureDoubleExecution(true); + } + + private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended) + throws Exception { + TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); + ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended); + long procId = procExecutor.submitProcedure(proc); + ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true); + ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId())); + } + public static class TestTimeoutEventProcedure extends NoopProcedure { private final ProcedureEvent event = new ProcedureEvent("timeout-event"); @@ -122,6 +153,26 @@ public class TestProcedureEvents { env.getProcedureScheduler().wakeEvent(event); return false; } + + @Override + protected void afterReplay(final TestProcEnv env) { + if (getState() == ProcedureState.WAITING_TIMEOUT) { + env.getProcedureScheduler().suspendEvent(event); + env.getProcedureScheduler().waitEvent(event, this); + } + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + StreamUtils.writeRawVInt32(stream, ntimeouts.get()); + StreamUtils.writeRawVInt32(stream, maxTimeouts); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + ntimeouts.set(StreamUtils.readRawVarint32(stream)); + maxTimeouts = StreamUtils.readRawVarint32(stream); + } } private class TestProcEnv {