diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index b2685f611dc..a832c783fe4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -769,14 +769,21 @@ public abstract class Procedure implements Comparable + * Another usage for this method is to implement retrying. A procedure can set the state to + * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a + * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a + * call {@link #setTimeout(int)} method to set the timeout. And you should also override this + * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the + * timeout event has been handled. * @return true to let the framework handle the timeout as abort, false in case the procedure * handled the timeout itself. */ protected synchronized boolean setTimeoutFailure(TEnvironment env) { if (state == ProcedureState.WAITING_TIMEOUT) { long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; - setFailure("ProcedureExecutor", new TimeoutIOException( - "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); + setFailure("ProcedureExecutor", + new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); return true; } return false; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index 8a438d4c8c5..8c8746e84bb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -44,40 +44,42 @@ public final class ProcedureUtil { // ========================================================================== // Reflection helpers to create/validate a Procedure object // ========================================================================== - public static Procedure newProcedure(final String className) throws BadProcedureException { + private static Procedure newProcedure(String className) throws BadProcedureException { try { - final Class clazz = Class.forName(className); + Class clazz = Class.forName(className); if (!Modifier.isPublic(clazz.getModifiers())) { throw new Exception("the " + clazz + " class is not public"); } - final Constructor ctor = clazz.getConstructor(); + @SuppressWarnings("rawtypes") + Constructor ctor = clazz.asSubclass(Procedure.class).getConstructor(); assert ctor != null : "no constructor found"; if (!Modifier.isPublic(ctor.getModifiers())) { throw new Exception("the " + clazz + " constructor is not public"); } - return (Procedure)ctor.newInstance(); + return ctor.newInstance(); } catch (Exception e) { - throw new BadProcedureException("The procedure class " + className + - " must be accessible and have an empty constructor", e); + throw new BadProcedureException( + "The procedure class " + className + " must be accessible and have an empty constructor", + e); } } - public static void validateClass(final Procedure proc) throws BadProcedureException { + static void validateClass(Procedure proc) throws BadProcedureException { try { - final Class clazz = proc.getClass(); + Class clazz = proc.getClass(); if (!Modifier.isPublic(clazz.getModifiers())) { throw new Exception("the " + clazz + " class is not public"); } - final Constructor ctor = clazz.getConstructor(); + Constructor ctor = clazz.getConstructor(); assert ctor != null; if (!Modifier.isPublic(ctor.getModifiers())) { throw new Exception("the " + clazz + " constructor is not public"); } } catch (Exception e) { throw new BadProcedureException("The procedure class " + proc.getClass().getName() + - " must be accessible and have an empty constructor", e); + " must be accessible and have an empty constructor", e); } } @@ -150,9 +152,10 @@ public final class ProcedureUtil { /** * Helper to convert the procedure to protobuf. + *

* Used by ProcedureStore implementations. */ - public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure proc) + public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure proc) throws IOException { Preconditions.checkArgument(proc != null); validateClass(proc); @@ -214,16 +217,17 @@ public final class ProcedureUtil { /** * Helper to convert the protobuf procedure. + *

* Used by ProcedureStore implementations. - * - * TODO: OPTIMIZATION: some of the field never change during the execution - * (e.g. className, procId, parentId, ...). - * We can split in 'data' and 'state', and the store - * may take advantage of it by storing the data only on insert(). + *

+ * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, + * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of + * it by storing the data only on insert(). */ - public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto) throws IOException { + public static Procedure convertToProcedure(ProcedureProtos.Procedure proto) + throws IOException { // Procedure from class name - final Procedure proc = newProcedure(proto.getClassName()); + Procedure proc = newProcedure(proto.getClassName()); // set fields proc.setProcId(proto.getProcId()); @@ -300,8 +304,7 @@ public final class ProcedureUtil { } public static LockServiceProtos.LockedResource convertToProtoLockedResource( - LockedResource lockedResource) throws IOException - { + LockedResource lockedResource) throws IOException { LockServiceProtos.LockedResource.Builder builder = LockServiceProtos.LockedResource.newBuilder(); @@ -328,4 +331,18 @@ public final class ProcedureUtil { return builder.build(); } + + /** + * Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max + * backoff time is 10 minutes. This is the general backoff policy for most procedure + * implementation. + */ + public static long getBackoffTimeMs(int attempts) { + long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now. + // avoid overflow + if (attempts >= 30) { + return maxBackoffTime; + } + return Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime); + } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java index 0fcb4f4283e..6342beccae0 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertEquals; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -29,12 +30,12 @@ import org.junit.experimental.categories.Category; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; -@Category({MasterTests.class, SmallTests.class}) +@Category({ MasterTests.class, SmallTests.class }) public class TestProcedureUtil { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestProcedureUtil.class); + HBaseClassTestRule.forClass(TestProcedureUtil.class); @Test public void testValidation() throws Exception { @@ -57,6 +58,16 @@ public class TestProcedureUtil { assertEquals("Procedure protobuf does not match", proto1, proto2); } + @Test + public void testGetBackoffTimeMs() { + for (int i = 30; i < 1000; i++) { + assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30)); + } + assertEquals(1000, ProcedureUtil.getBackoffTimeMs(0)); + assertEquals(2000, ProcedureUtil.getBackoffTimeMs(1)); + assertEquals(32000, ProcedureUtil.getBackoffTimeMs(5)); + } + public static class TestProcedureNoDefaultConstructor extends TestProcedure { public TestProcedureNoDefaultConstructor(int x) {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index c10bf2d05da..3b0735e46e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; @@ -360,7 +361,7 @@ public abstract class RegionTransitionProcedure // If here, success so clear out the attempt counter so we start fresh each time we get stuck. this.attempt = 0; } catch (IOException e) { - long backoff = getBackoffTime(this.attempt++); + long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " + "by other Procedure or operator intervention", backoff / 1000, this, regionNode.toShortString(), e); @@ -372,12 +373,6 @@ public abstract class RegionTransitionProcedure return new Procedure[] {this}; } - private long getBackoffTime(int attempts) { - long backoffTime = (long)(1000 * Math.pow(2, attempts)); - long maxBackoffTime = 60 * 60 * 1000; // An hour. Hard-coded for for now. - return backoffTime < maxBackoffTime? backoffTime: maxBackoffTime; - } - /** * At end of timeout, wake ourselves up so we run again. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java index 8f3aa223576..8a3195300f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * Used for reopening the regions for a table. @@ -52,6 +54,8 @@ public class ReopenTableRegionsProcedure private List regions = Collections.emptyList(); + private int attempt; + public ReopenTableRegionsProcedure() { } @@ -104,23 +108,34 @@ public class ReopenTableRegionsProcedure return Flow.NO_MORE_STATE; } if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) { + attempt = 0; setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); return Flow.HAS_MORE_STATE; } - LOG.info("There are still {} region(s) which need to be reopened for table {} are in " + - "OPENING state, try again later", regions.size(), tableName); // All the regions need to reopen are in OPENING state which means we can not schedule any - // MRPs. Then sleep for one second, and yield the procedure to let other procedures run - // first and hope next time we can get some regions in other state to make progress. - // TODO: add a delay for ProcedureYieldException so that we do not need to sleep here which - // blocks a procedure worker. - Thread.sleep(1000); - throw new ProcedureYieldException(); + // MRPs. + long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); + LOG.info( + "There are still {} region(s) which need to be reopened for table {} are in " + + "OPENING state, suspend {}secs and try again later", + regions.size(), tableName, backoff / 1000); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + throw new ProcedureSuspendedException(); default: throw new UnsupportedOperationException("unhandled state=" + state); } } + /** + * At end of timeout, wake ourselves up so we run again. + */ + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; // 'false' means that this procedure handled the timeout + } @Override protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state) throws IOException, InterruptedException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java new file mode 100644 index 00000000000..015419135bc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBackoff.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Confirm that we will do backoff when retrying on reopening table regions, to avoid consuming all + * the CPUs. + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestReopenTableRegionsProcedureBackoff { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBackoff.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestReopenTableRegionsProcedureBackoff.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("Backoff"); + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, CF); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testRetryBackoff() throws IOException, InterruptedException { + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + RegionInfo regionInfo = UTIL.getAdmin().getRegions(TABLE_NAME).get(0); + RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo); + long openSeqNum; + synchronized (regionNode) { + openSeqNum = regionNode.getOpenSeqNum(); + // make a fake state to let the procedure wait. + regionNode.setState(State.OPENING); + regionNode.setOpenSeqNum(-1L); + } + ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME); + procExec.submitProcedure(proc); + UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); + long oldTimeout = 0; + int timeoutIncrements = 0; + for (;;) { + long timeout = proc.getTimeout(); + if (timeout > oldTimeout) { + LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout, + timeoutIncrements); + oldTimeout = timeout; + timeoutIncrements++; + if (timeoutIncrements > 3) { + // If we incremented at least twice, break; the backoff is working. + break; + } + } + Thread.sleep(1000); + } + synchronized (regionNode) { + // reset to the correct state + regionNode.setState(State.OPEN); + regionNode.setOpenSeqNum(openSeqNum); + } + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000); + assertTrue(regionNode.getOpenSeqNum() > openSeqNum); + } +}