From 4947e72f631fe720eab40641affb8d312721d980 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 27 Sep 2018 17:42:16 +0800 Subject: [PATCH] HBASE-21233 Allow the procedure implementation to skip persistence of the state after a execution --- .../hadoop/hbase/procedure2/Procedure.java | 26 +++ .../hbase/procedure2/ProcedureExecutor.java | 12 +- .../TestProcedureSkipPersistence.java | 163 ++++++++++++++++++ 3 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSkipPersistence.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index a832c783fe4..08c7ce3babd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -156,6 +156,20 @@ public abstract class Procedure implements Comparable + * This is useful when the procedure is in error and you want to retry later. The retry interval + * and the number of retries are usually not critical so skip the persistence can save some + * resources, and also speed up the restart processing. + *

+ * Notice that this value will be reset to true every time before execution. And when rolling back + * we do not test this value. + */ + private boolean persist = true; + public boolean isBypass() { return bypass; } @@ -170,6 +184,18 @@ public abstract class Procedure implements Comparable { Procedure[] subprocs = null; do { reExecute = false; + procedure.resetPersistence(); try { subprocs = procedure.doExecute(getEnvironment()); if (subprocs != null && subprocs.length == 0) { @@ -1746,7 +1745,9 @@ public class ProcedureExecutor { // // Commit the transaction even if a suspend (state may have changed). Note this append // can take a bunch of time to complete. - updateStoreOnExec(procStack, procedure, subprocs); + if (procedure.needPersistence()) { + updateStoreOnExec(procStack, procedure, subprocs); + } // if the store is not running we are aborting if (!store.isRunning()) { @@ -1941,6 +1942,11 @@ public class ProcedureExecutor { return rollbackStack.get(rootProcId); } + @VisibleForTesting + ProcedureScheduler getProcedureScheduler() { + return scheduler; + } + // ========================================================================== // Worker Thread // ========================================================================== diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSkipPersistence.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSkipPersistence.java new file mode 100644 index 00000000000..8293dcb4ed3 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSkipPersistence.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestProcedureSkipPersistence { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestProcedureSkipPersistence.class); + private ProcedureExecutor procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + private static volatile int STEP = 0; + + public class ProcEnv { + + public ProcedureExecutor getProcedureExecutor() { + return procExecutor; + } + } + + public static class TestProcedure extends Procedure { + + // need to override this method, otherwise we will persist the release lock operation and the + // test will fail. + @Override + protected boolean holdLock(ProcEnv env) { + return true; + } + + @Override + protected Procedure[] execute(ProcEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (STEP == 0) { + STEP = 1; + setTimeout(60 * 60 * 1000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } else if (STEP == 1) { + STEP = 2; + if (hasTimeout()) { + setFailure("Should not persist the timeout value", + new IOException("Should not persist the timeout value")); + return null; + } + setTimeout(2 * 1000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + // used to confirm that we reset the persist flag before execution + throw new ProcedureSuspendedException(); + } else { + if (!hasTimeout()) { + setFailure("Should have persisted the timeout value", + new IOException("Should have persisted the timeout value")); + } + return null; + } + } + + @Override + protected synchronized boolean setTimeoutFailure(ProcEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureExecutor().getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(ProcEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(ProcEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new ProcEnv(), procStore); + procStore.start(1); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + @Test + public void test() throws Exception { + TestProcedure proc = new TestProcedure(); + long procId = procExecutor.submitProcedure(proc); + htu.waitFor(30000, () -> proc.isWaiting()); + ProcedureTestingUtility.restart(procExecutor); + htu.waitFor(30000, () -> { + Procedure p = procExecutor.getProcedure(procId); + return p.isWaiting() || p.isFinished(); + }); + assertFalse(procExecutor.isFinished(procId)); + ProcedureTestingUtility.restart(procExecutor); + htu.waitFor(30000, () -> procExecutor.isFinished(procId)); + Procedure p = procExecutor.getResult(procId); + assertTrue(p.isSuccess()); + } +}