HBASE-21172 Reimplement the retry backoff logic for ReopenTableRegionsProcedure

This commit is contained in:
Duo Zhang 2018-09-10 17:59:31 +08:00
parent bea26e98e6
commit c59ecfb961
6 changed files with 215 additions and 41 deletions

View File

@ -766,14 +766,21 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
* <p/>
* Another usage for this method is to implement retrying. A procedure can set the state to
* {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
* {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
* call {@link #setTimeout(int)} method to set the timeout. And you should also override this
* method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
* timeout event has been handled.
* @return true to let the framework handle the timeout as abort, false in case the procedure
* handled the timeout itself.
*/
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
setFailure("ProcedureExecutor", new TimeoutIOException(
"Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
setFailure("ProcedureExecutor",
new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
return true;
}
return false;

View File

@ -44,33 +44,35 @@ public final class ProcedureUtil {
// ==========================================================================
// Reflection helpers to create/validate a Procedure object
// ==========================================================================
public static Procedure newProcedure(final String className) throws BadProcedureException {
private static Procedure<?> newProcedure(String className) throws BadProcedureException {
try {
final Class<?> clazz = Class.forName(className);
Class<?> clazz = Class.forName(className);
if (!Modifier.isPublic(clazz.getModifiers())) {
throw new Exception("the " + clazz + " class is not public");
}
final Constructor<?> ctor = clazz.getConstructor();
@SuppressWarnings("rawtypes")
Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
assert ctor != null : "no constructor found";
if (!Modifier.isPublic(ctor.getModifiers())) {
throw new Exception("the " + clazz + " constructor is not public");
}
return (Procedure)ctor.newInstance();
return ctor.newInstance();
} catch (Exception e) {
throw new BadProcedureException("The procedure class " + className +
" must be accessible and have an empty constructor", e);
throw new BadProcedureException(
"The procedure class " + className + " must be accessible and have an empty constructor",
e);
}
}
public static void validateClass(final Procedure proc) throws BadProcedureException {
static void validateClass(Procedure<?> proc) throws BadProcedureException {
try {
final Class<?> clazz = proc.getClass();
Class<?> clazz = proc.getClass();
if (!Modifier.isPublic(clazz.getModifiers())) {
throw new Exception("the " + clazz + " class is not public");
}
final Constructor<?> ctor = clazz.getConstructor();
Constructor<?> ctor = clazz.getConstructor();
assert ctor != null;
if (!Modifier.isPublic(ctor.getModifiers())) {
throw new Exception("the " + clazz + " constructor is not public");
@ -150,9 +152,10 @@ public final class ProcedureUtil {
/**
* Helper to convert the procedure to protobuf.
* <p/>
* Used by ProcedureStore implementations.
*/
public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure proc)
public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
throws IOException {
Preconditions.checkArgument(proc != null);
validateClass(proc);
@ -214,16 +217,17 @@ public final class ProcedureUtil {
/**
* Helper to convert the protobuf procedure.
* <p/>
* Used by ProcedureStore implementations.
*
* TODO: OPTIMIZATION: some of the field never change during the execution
* (e.g. className, procId, parentId, ...).
* We can split in 'data' and 'state', and the store
* may take advantage of it by storing the data only on insert().
* <p/>
* TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
* procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
* it by storing the data only on insert().
*/
public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto) throws IOException {
public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
throws IOException {
// Procedure from class name
final Procedure proc = newProcedure(proto.getClassName());
Procedure<?> proc = newProcedure(proto.getClassName());
// set fields
proc.setProcId(proto.getProcId());
@ -300,8 +304,7 @@ public final class ProcedureUtil {
}
public static LockServiceProtos.LockedResource convertToProtoLockedResource(
LockedResource lockedResource) throws IOException
{
LockedResource lockedResource) throws IOException {
LockServiceProtos.LockedResource.Builder builder =
LockServiceProtos.LockedResource.newBuilder();
@ -328,4 +331,18 @@ public final class ProcedureUtil {
return builder.build();
}
/**
* Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
* backoff time is 10 minutes. This is the general backoff policy for most procedure
* implementation.
*/
public static long getBackoffTimeMs(int attempts) {
long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
// avoid overflow
if (attempts >= 30) {
return maxBackoffTime;
}
return Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -57,6 +58,16 @@ public class TestProcedureUtil {
assertEquals("Procedure protobuf does not match", proto1, proto2);
}
@Test
public void testGetBackoffTimeMs() {
for (int i = 30; i < 1000; i++) {
assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
}
assertEquals(1000, ProcedureUtil.getBackoffTimeMs(0));
assertEquals(2000, ProcedureUtil.getBackoffTimeMs(1));
assertEquals(32000, ProcedureUtil.getBackoffTimeMs(5));
}
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
public TestProcedureNoDefaultConstructor(int x) {}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -321,7 +322,7 @@ public class TransitRegionStateProcedure
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
long backoff = getBackoffTime(this.attempt++);
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
LOG.warn(
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
"by other Procedure or operator intervention",
@ -441,18 +442,11 @@ public class TransitRegionStateProcedure
}
}
private long getBackoffTime(int attempts) {
long backoffTime = (long) (1000 * Math.pow(2, attempts));
long maxBackoffTime = 60 * 60 * 1000; // An hour. Hard-coded for for now.
return backoffTime < maxBackoffTime ? backoffTime : maxBackoffTime;
}
private boolean incrementAndCheckMaxAttempts(MasterProcedureEnv env, RegionStateNode regionNode) {
int retries = env.getAssignmentManager().getRegionStates().addToFailedOpen(regionNode)
.incrementAndGetRetries();
int max = env.getAssignmentManager().getAssignMaxAttempts();
LOG.info(
"Retry=" + retries + " of max=" + max + "; " + this + "; " + regionNode.toShortString());
LOG.info("Retry={} of max={}; {}; {}", retries, max, this, regionNode.toShortString());
return retries >= max;
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* Used for reopening the regions for a table.
@ -49,6 +51,8 @@ public class ReopenTableRegionsProcedure
private List<HRegionLocation> regions = Collections.emptyList();
private int attempt;
public ReopenTableRegionsProcedure() {
}
@ -105,23 +109,34 @@ public class ReopenTableRegionsProcedure
return Flow.NO_MORE_STATE;
}
if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
attempt = 0;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
}
LOG.info("There are still {} region(s) which need to be reopened for table {} are in " +
"OPENING state, try again later", regions.size(), tableName);
// All the regions need to reopen are in OPENING state which means we can not schedule any
// MRPs. Then sleep for one second, and yield the procedure to let other procedures run
// first and hope next time we can get some regions in other state to make progress.
// TODO: add a delay for ProcedureYieldException so that we do not need to sleep here which
// blocks a procedure worker.
Thread.sleep(1000);
throw new ProcedureYieldException();
// MRPs.
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
LOG.info(
"There are still {} region(s) which need to be reopened for table {} are in " +
"OPENING state, suspend {}secs and try again later",
regions.size(), tableName, backoff / 1000);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
/**
* At end of timeout, wake ourselves up so we run again.
*/
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false; // 'false' means that this procedure handled the timeout
}
@Override
protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
throws IOException, InterruptedException {

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Confirm that we will do backoff when retrying on reopening table regions, to avoid consuming all
* the CPUs.
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestReopenTableRegionsProcedureBackoff {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBackoff.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestReopenTableRegionsProcedureBackoff.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
private static byte[] CF = Bytes.toBytes("cf");
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.startMiniCluster(1);
UTIL.createTable(TABLE_NAME, CF);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@Test
public void testRetryBackoff() throws IOException, InterruptedException {
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
RegionInfo regionInfo = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo);
// just a dummy one
TransitRegionStateProcedure trsp =
TransitRegionStateProcedure.unassign(procExec.getEnvironment(), regionInfo);
long openSeqNum;
regionNode.lock();
try {
openSeqNum = regionNode.getOpenSeqNum();
// make a fake state to let the procedure wait.
regionNode.setState(State.OPENING);
regionNode.setOpenSeqNum(-1L);
regionNode.setProcedure(trsp);
} finally {
regionNode.unlock();
}
ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME);
procExec.submitProcedure(proc);
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
long oldTimeout = 0;
int timeoutIncrements = 0;
for (;;) {
long timeout = proc.getTimeout();
if (timeout > oldTimeout) {
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
timeoutIncrements);
oldTimeout = timeout;
timeoutIncrements++;
if (timeoutIncrements > 3) {
// If we incremented at least twice, break; the backoff is working.
break;
}
}
Thread.sleep(1000);
}
regionNode.lock();
try {
// make a fake state to let the procedure wait.
regionNode.setState(State.OPEN);
regionNode.setOpenSeqNum(openSeqNum);
regionNode.unsetProcedure(trsp);
} finally {
regionNode.unlock();
}
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
assertTrue(regionNode.getOpenSeqNum() > openSeqNum);
}
}