HBASE-21172 Reimplement the retry backoff logic for ReopenTableRegionsProcedure
This commit is contained in:
parent
ea4194039e
commit
2da6dbe563
|
@ -769,14 +769,21 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
|
||||
/**
|
||||
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
|
||||
* <p/>
|
||||
* Another usage for this method is to implement retrying. A procedure can set the state to
|
||||
* {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
|
||||
* {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
|
||||
* call {@link #setTimeout(int)} method to set the timeout. And you should also override this
|
||||
* method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
|
||||
* timeout event has been handled.
|
||||
* @return true to let the framework handle the timeout as abort, false in case the procedure
|
||||
* handled the timeout itself.
|
||||
*/
|
||||
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
|
||||
if (state == ProcedureState.WAITING_TIMEOUT) {
|
||||
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
|
||||
setFailure("ProcedureExecutor", new TimeoutIOException(
|
||||
"Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
|
||||
setFailure("ProcedureExecutor",
|
||||
new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -44,40 +44,42 @@ public final class ProcedureUtil {
|
|||
// ==========================================================================
|
||||
// Reflection helpers to create/validate a Procedure object
|
||||
// ==========================================================================
|
||||
public static Procedure newProcedure(final String className) throws BadProcedureException {
|
||||
private static Procedure<?> newProcedure(String className) throws BadProcedureException {
|
||||
try {
|
||||
final Class<?> clazz = Class.forName(className);
|
||||
Class<?> clazz = Class.forName(className);
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
final Constructor<?> ctor = clazz.getConstructor();
|
||||
@SuppressWarnings("rawtypes")
|
||||
Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
|
||||
assert ctor != null : "no constructor found";
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
return (Procedure)ctor.newInstance();
|
||||
return ctor.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new BadProcedureException("The procedure class " + className +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
throw new BadProcedureException(
|
||||
"The procedure class " + className + " must be accessible and have an empty constructor",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void validateClass(final Procedure proc) throws BadProcedureException {
|
||||
static void validateClass(Procedure<?> proc) throws BadProcedureException {
|
||||
try {
|
||||
final Class<?> clazz = proc.getClass();
|
||||
Class<?> clazz = proc.getClass();
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
final Constructor<?> ctor = clazz.getConstructor();
|
||||
Constructor<?> ctor = clazz.getConstructor();
|
||||
assert ctor != null;
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
" must be accessible and have an empty constructor", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,9 +152,10 @@ public final class ProcedureUtil {
|
|||
|
||||
/**
|
||||
* Helper to convert the procedure to protobuf.
|
||||
* <p/>
|
||||
* Used by ProcedureStore implementations.
|
||||
*/
|
||||
public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure proc)
|
||||
public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(proc != null);
|
||||
validateClass(proc);
|
||||
|
@ -214,16 +217,17 @@ public final class ProcedureUtil {
|
|||
|
||||
/**
|
||||
* Helper to convert the protobuf procedure.
|
||||
* <p/>
|
||||
* Used by ProcedureStore implementations.
|
||||
*
|
||||
* TODO: OPTIMIZATION: some of the field never change during the execution
|
||||
* (e.g. className, procId, parentId, ...).
|
||||
* We can split in 'data' and 'state', and the store
|
||||
* may take advantage of it by storing the data only on insert().
|
||||
* <p/>
|
||||
* TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
|
||||
* procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
|
||||
* it by storing the data only on insert().
|
||||
*/
|
||||
public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto) throws IOException {
|
||||
public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
|
||||
throws IOException {
|
||||
// Procedure from class name
|
||||
final Procedure proc = newProcedure(proto.getClassName());
|
||||
Procedure<?> proc = newProcedure(proto.getClassName());
|
||||
|
||||
// set fields
|
||||
proc.setProcId(proto.getProcId());
|
||||
|
@ -300,8 +304,7 @@ public final class ProcedureUtil {
|
|||
}
|
||||
|
||||
public static LockServiceProtos.LockedResource convertToProtoLockedResource(
|
||||
LockedResource lockedResource) throws IOException
|
||||
{
|
||||
LockedResource lockedResource) throws IOException {
|
||||
LockServiceProtos.LockedResource.Builder builder =
|
||||
LockServiceProtos.LockedResource.newBuilder();
|
||||
|
||||
|
@ -328,4 +331,18 @@ public final class ProcedureUtil {
|
|||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
|
||||
* backoff time is 10 minutes. This is the general backoff policy for most procedure
|
||||
* implementation.
|
||||
*/
|
||||
public static long getBackoffTimeMs(int attempts) {
|
||||
long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
|
||||
// avoid overflow
|
||||
if (attempts >= 30) {
|
||||
return maxBackoffTime;
|
||||
}
|
||||
return Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
|
@ -29,12 +30,12 @@ import org.junit.experimental.categories.Category;
|
|||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestProcedureUtil {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureUtil.class);
|
||||
HBaseClassTestRule.forClass(TestProcedureUtil.class);
|
||||
|
||||
@Test
|
||||
public void testValidation() throws Exception {
|
||||
|
@ -57,6 +58,16 @@ public class TestProcedureUtil {
|
|||
assertEquals("Procedure protobuf does not match", proto1, proto2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBackoffTimeMs() {
|
||||
for (int i = 30; i < 1000; i++) {
|
||||
assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
|
||||
}
|
||||
assertEquals(1000, ProcedureUtil.getBackoffTimeMs(0));
|
||||
assertEquals(2000, ProcedureUtil.getBackoffTimeMs(1));
|
||||
assertEquals(32000, ProcedureUtil.getBackoffTimeMs(5));
|
||||
}
|
||||
|
||||
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
|
||||
public TestProcedureNoDefaultConstructor(int x) {}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
|||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
|
@ -360,7 +361,7 @@ public abstract class RegionTransitionProcedure
|
|||
// If here, success so clear out the attempt counter so we start fresh each time we get stuck.
|
||||
this.attempt = 0;
|
||||
} catch (IOException e) {
|
||||
long backoff = getBackoffTime(this.attempt++);
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
|
||||
LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
|
||||
"by other Procedure or operator intervention", backoff / 1000, this,
|
||||
regionNode.toShortString(), e);
|
||||
|
@ -372,12 +373,6 @@ public abstract class RegionTransitionProcedure
|
|||
return new Procedure[] {this};
|
||||
}
|
||||
|
||||
private long getBackoffTime(int attempts) {
|
||||
long backoffTime = (long)(1000 * Math.pow(2, attempts));
|
||||
long maxBackoffTime = 60 * 60 * 1000; // An hour. Hard-coded for for now.
|
||||
return backoffTime < maxBackoffTime? backoffTime: maxBackoffTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* At end of timeout, wake ourselves up so we run again.
|
||||
*/
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
|
|||
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
|
||||
/**
|
||||
* Used for reopening the regions for a table.
|
||||
|
@ -52,6 +54,8 @@ public class ReopenTableRegionsProcedure
|
|||
|
||||
private List<HRegionLocation> regions = Collections.emptyList();
|
||||
|
||||
private int attempt;
|
||||
|
||||
public ReopenTableRegionsProcedure() {
|
||||
}
|
||||
|
||||
|
@ -104,23 +108,34 @@ public class ReopenTableRegionsProcedure
|
|||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
|
||||
attempt = 0;
|
||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
LOG.info("There are still {} region(s) which need to be reopened for table {} are in " +
|
||||
"OPENING state, try again later", regions.size(), tableName);
|
||||
// All the regions need to reopen are in OPENING state which means we can not schedule any
|
||||
// MRPs. Then sleep for one second, and yield the procedure to let other procedures run
|
||||
// first and hope next time we can get some regions in other state to make progress.
|
||||
// TODO: add a delay for ProcedureYieldException so that we do not need to sleep here which
|
||||
// blocks a procedure worker.
|
||||
Thread.sleep(1000);
|
||||
throw new ProcedureYieldException();
|
||||
// MRPs.
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
|
||||
LOG.info(
|
||||
"There are still {} region(s) which need to be reopened for table {} are in " +
|
||||
"OPENING state, suspend {}secs and try again later",
|
||||
regions.size(), tableName, backoff / 1000);
|
||||
setTimeout(Math.toIntExact(backoff));
|
||||
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||
throw new ProcedureSuspendedException();
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* At end of timeout, wake ourselves up so we run again.
|
||||
*/
|
||||
@Override
|
||||
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
|
||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||
env.getProcedureScheduler().addFront(this);
|
||||
return false; // 'false' means that this procedure handled the timeout
|
||||
}
|
||||
@Override
|
||||
protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
|
||||
throws IOException, InterruptedException {
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
|
||||
/**
|
||||
* Confirm that we will do backoff when retrying on reopening table regions, to avoid consuming all
|
||||
* the CPUs.
|
||||
*/
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestReopenTableRegionsProcedureBackoff {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBackoff.class);
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestReopenTableRegionsProcedureBackoff.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
|
||||
|
||||
private static byte[] CF = Bytes.toBytes("cf");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||
UTIL.startMiniCluster(1);
|
||||
UTIL.createTable(TABLE_NAME, CF);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryBackoff() throws IOException, InterruptedException {
|
||||
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
RegionInfo regionInfo = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
|
||||
RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo);
|
||||
long openSeqNum;
|
||||
synchronized (regionNode) {
|
||||
openSeqNum = regionNode.getOpenSeqNum();
|
||||
// make a fake state to let the procedure wait.
|
||||
regionNode.setState(State.OPENING);
|
||||
regionNode.setOpenSeqNum(-1L);
|
||||
}
|
||||
ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME);
|
||||
procExec.submitProcedure(proc);
|
||||
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
|
||||
long oldTimeout = 0;
|
||||
int timeoutIncrements = 0;
|
||||
for (;;) {
|
||||
long timeout = proc.getTimeout();
|
||||
if (timeout > oldTimeout) {
|
||||
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
||||
timeoutIncrements);
|
||||
oldTimeout = timeout;
|
||||
timeoutIncrements++;
|
||||
if (timeoutIncrements > 3) {
|
||||
// If we incremented at least twice, break; the backoff is working.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
synchronized (regionNode) {
|
||||
// reset to the correct state
|
||||
regionNode.setState(State.OPEN);
|
||||
regionNode.setOpenSeqNum(openSeqNum);
|
||||
}
|
||||
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
|
||||
assertTrue(regionNode.getOpenSeqNum() > openSeqNum);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue