HBASE-21172 Reimplement the retry backoff logic for ReopenTableRegionsProcedure
This commit is contained in:
parent
52d50cc426
commit
55e1297d96
|
@ -769,14 +769,21 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
|
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
|
||||||
|
* <p/>
|
||||||
|
* Another usage for this method is to implement retrying. A procedure can set the state to
|
||||||
|
* {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a
|
||||||
|
* {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a
|
||||||
|
* call {@link #setTimeout(int)} method to set the timeout. And you should also override this
|
||||||
|
* method to wake up the procedure, and also return false to tell the ProcedureExecutor that the
|
||||||
|
* timeout event has been handled.
|
||||||
* @return true to let the framework handle the timeout as abort, false in case the procedure
|
* @return true to let the framework handle the timeout as abort, false in case the procedure
|
||||||
* handled the timeout itself.
|
* handled the timeout itself.
|
||||||
*/
|
*/
|
||||||
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
|
protected synchronized boolean setTimeoutFailure(TEnvironment env) {
|
||||||
if (state == ProcedureState.WAITING_TIMEOUT) {
|
if (state == ProcedureState.WAITING_TIMEOUT) {
|
||||||
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
|
long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
|
||||||
setFailure("ProcedureExecutor", new TimeoutIOException(
|
setFailure("ProcedureExecutor",
|
||||||
"Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
|
new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -44,40 +44,42 @@ public final class ProcedureUtil {
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Reflection helpers to create/validate a Procedure object
|
// Reflection helpers to create/validate a Procedure object
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
public static Procedure newProcedure(final String className) throws BadProcedureException {
|
private static Procedure<?> newProcedure(String className) throws BadProcedureException {
|
||||||
try {
|
try {
|
||||||
final Class<?> clazz = Class.forName(className);
|
Class<?> clazz = Class.forName(className);
|
||||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||||
throw new Exception("the " + clazz + " class is not public");
|
throw new Exception("the " + clazz + " class is not public");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Constructor<?> ctor = clazz.getConstructor();
|
@SuppressWarnings("rawtypes")
|
||||||
|
Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
|
||||||
assert ctor != null : "no constructor found";
|
assert ctor != null : "no constructor found";
|
||||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||||
throw new Exception("the " + clazz + " constructor is not public");
|
throw new Exception("the " + clazz + " constructor is not public");
|
||||||
}
|
}
|
||||||
return (Procedure)ctor.newInstance();
|
return ctor.newInstance();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new BadProcedureException("The procedure class " + className +
|
throw new BadProcedureException(
|
||||||
" must be accessible and have an empty constructor", e);
|
"The procedure class " + className + " must be accessible and have an empty constructor",
|
||||||
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void validateClass(final Procedure proc) throws BadProcedureException {
|
static void validateClass(Procedure<?> proc) throws BadProcedureException {
|
||||||
try {
|
try {
|
||||||
final Class<?> clazz = proc.getClass();
|
Class<?> clazz = proc.getClass();
|
||||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||||
throw new Exception("the " + clazz + " class is not public");
|
throw new Exception("the " + clazz + " class is not public");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Constructor<?> ctor = clazz.getConstructor();
|
Constructor<?> ctor = clazz.getConstructor();
|
||||||
assert ctor != null;
|
assert ctor != null;
|
||||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||||
throw new Exception("the " + clazz + " constructor is not public");
|
throw new Exception("the " + clazz + " constructor is not public");
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
|
throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
|
||||||
" must be accessible and have an empty constructor", e);
|
" must be accessible and have an empty constructor", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,9 +152,10 @@ public final class ProcedureUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to convert the procedure to protobuf.
|
* Helper to convert the procedure to protobuf.
|
||||||
|
* <p/>
|
||||||
* Used by ProcedureStore implementations.
|
* Used by ProcedureStore implementations.
|
||||||
*/
|
*/
|
||||||
public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure proc)
|
public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkArgument(proc != null);
|
Preconditions.checkArgument(proc != null);
|
||||||
validateClass(proc);
|
validateClass(proc);
|
||||||
|
@ -214,16 +217,17 @@ public final class ProcedureUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to convert the protobuf procedure.
|
* Helper to convert the protobuf procedure.
|
||||||
|
* <p/>
|
||||||
* Used by ProcedureStore implementations.
|
* Used by ProcedureStore implementations.
|
||||||
*
|
* <p/>
|
||||||
* TODO: OPTIMIZATION: some of the field never change during the execution
|
* TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
|
||||||
* (e.g. className, procId, parentId, ...).
|
* procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
|
||||||
* We can split in 'data' and 'state', and the store
|
* it by storing the data only on insert().
|
||||||
* may take advantage of it by storing the data only on insert().
|
|
||||||
*/
|
*/
|
||||||
public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto) throws IOException {
|
public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
|
||||||
|
throws IOException {
|
||||||
// Procedure from class name
|
// Procedure from class name
|
||||||
final Procedure proc = newProcedure(proto.getClassName());
|
Procedure<?> proc = newProcedure(proto.getClassName());
|
||||||
|
|
||||||
// set fields
|
// set fields
|
||||||
proc.setProcId(proto.getProcId());
|
proc.setProcId(proto.getProcId());
|
||||||
|
@ -300,8 +304,7 @@ public final class ProcedureUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static LockServiceProtos.LockedResource convertToProtoLockedResource(
|
public static LockServiceProtos.LockedResource convertToProtoLockedResource(
|
||||||
LockedResource lockedResource) throws IOException
|
LockedResource lockedResource) throws IOException {
|
||||||
{
|
|
||||||
LockServiceProtos.LockedResource.Builder builder =
|
LockServiceProtos.LockedResource.Builder builder =
|
||||||
LockServiceProtos.LockedResource.newBuilder();
|
LockServiceProtos.LockedResource.newBuilder();
|
||||||
|
|
||||||
|
@ -328,4 +331,18 @@ public final class ProcedureUtil {
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
|
||||||
|
* backoff time is 10 minutes. This is the general backoff policy for most procedure
|
||||||
|
* implementation.
|
||||||
|
*/
|
||||||
|
public static long getBackoffTimeMs(int attempts) {
|
||||||
|
long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
|
||||||
|
// avoid overflow
|
||||||
|
if (attempts >= 30) {
|
||||||
|
return maxBackoffTime;
|
||||||
|
}
|
||||||
|
return Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
@ -29,12 +30,12 @@ import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({ MasterTests.class, SmallTests.class })
|
||||||
public class TestProcedureUtil {
|
public class TestProcedureUtil {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestProcedureUtil.class);
|
HBaseClassTestRule.forClass(TestProcedureUtil.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testValidation() throws Exception {
|
public void testValidation() throws Exception {
|
||||||
|
@ -57,6 +58,16 @@ public class TestProcedureUtil {
|
||||||
assertEquals("Procedure protobuf does not match", proto1, proto2);
|
assertEquals("Procedure protobuf does not match", proto1, proto2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBackoffTimeMs() {
|
||||||
|
for (int i = 30; i < 1000; i++) {
|
||||||
|
assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
|
||||||
|
}
|
||||||
|
assertEquals(1000, ProcedureUtil.getBackoffTimeMs(0));
|
||||||
|
assertEquals(2000, ProcedureUtil.getBackoffTimeMs(1));
|
||||||
|
assertEquals(32000, ProcedureUtil.getBackoffTimeMs(5));
|
||||||
|
}
|
||||||
|
|
||||||
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
|
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
|
||||||
public TestProcedureNoDefaultConstructor(int x) {}
|
public TestProcedureNoDefaultConstructor(int x) {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -321,7 +322,7 @@ public class TransitRegionStateProcedure
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
long backoff = getBackoffTime(this.attempt++);
|
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
|
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
|
||||||
"by other Procedure or operator intervention",
|
"by other Procedure or operator intervention",
|
||||||
|
@ -441,18 +442,11 @@ public class TransitRegionStateProcedure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getBackoffTime(int attempts) {
|
|
||||||
long backoffTime = (long) (1000 * Math.pow(2, attempts));
|
|
||||||
long maxBackoffTime = 60 * 60 * 1000; // An hour. Hard-coded for for now.
|
|
||||||
return backoffTime < maxBackoffTime ? backoffTime : maxBackoffTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean incrementAndCheckMaxAttempts(MasterProcedureEnv env, RegionStateNode regionNode) {
|
private boolean incrementAndCheckMaxAttempts(MasterProcedureEnv env, RegionStateNode regionNode) {
|
||||||
int retries = env.getAssignmentManager().getRegionStates().addToFailedOpen(regionNode)
|
int retries = env.getAssignmentManager().getRegionStates().addToFailedOpen(regionNode)
|
||||||
.incrementAndGetRetries();
|
.incrementAndGetRetries();
|
||||||
int max = env.getAssignmentManager().getAssignMaxAttempts();
|
int max = env.getAssignmentManager().getAssignMaxAttempts();
|
||||||
LOG.info(
|
LOG.info("Retry={} of max={}; {}; {}", retries, max, this, regionNode.toShortString());
|
||||||
"Retry=" + retries + " of max=" + max + "; " + this + "; " + regionNode.toShortString());
|
|
||||||
return retries >= max;
|
return retries >= max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for reopening the regions for a table.
|
* Used for reopening the regions for a table.
|
||||||
|
@ -49,6 +51,8 @@ public class ReopenTableRegionsProcedure
|
||||||
|
|
||||||
private List<HRegionLocation> regions = Collections.emptyList();
|
private List<HRegionLocation> regions = Collections.emptyList();
|
||||||
|
|
||||||
|
private int attempt;
|
||||||
|
|
||||||
public ReopenTableRegionsProcedure() {
|
public ReopenTableRegionsProcedure() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,23 +109,34 @@ public class ReopenTableRegionsProcedure
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
}
|
}
|
||||||
if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
|
if (regions.stream().anyMatch(l -> l.getSeqNum() >= 0)) {
|
||||||
|
attempt = 0;
|
||||||
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
}
|
}
|
||||||
LOG.info("There are still {} region(s) which need to be reopened for table {} are in " +
|
|
||||||
"OPENING state, try again later", regions.size(), tableName);
|
|
||||||
// All the regions need to reopen are in OPENING state which means we can not schedule any
|
// All the regions need to reopen are in OPENING state which means we can not schedule any
|
||||||
// MRPs. Then sleep for one second, and yield the procedure to let other procedures run
|
// MRPs.
|
||||||
// first and hope next time we can get some regions in other state to make progress.
|
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
|
||||||
// TODO: add a delay for ProcedureYieldException so that we do not need to sleep here which
|
LOG.info(
|
||||||
// blocks a procedure worker.
|
"There are still {} region(s) which need to be reopened for table {} are in " +
|
||||||
Thread.sleep(1000);
|
"OPENING state, suspend {}secs and try again later",
|
||||||
throw new ProcedureYieldException();
|
regions.size(), tableName, backoff / 1000);
|
||||||
|
setTimeout(Math.toIntExact(backoff));
|
||||||
|
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
|
||||||
|
throw new ProcedureSuspendedException();
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* At end of timeout, wake ourselves up so we run again.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
|
||||||
|
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||||
|
env.getProcedureScheduler().addFront(this);
|
||||||
|
return false; // 'false' means that this procedure handled the timeout
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
|
protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
|
||||||
|
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Confirm that we will do backoff when retrying on reopening table regions, to avoid consuming all
|
||||||
|
* the CPUs.
|
||||||
|
*/
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestReopenTableRegionsProcedureBackoff {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBackoff.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestReopenTableRegionsProcedureBackoff.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
UTIL.createTable(TABLE_NAME, CF);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryBackoff() throws IOException, InterruptedException {
|
||||||
|
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
|
||||||
|
ProcedureExecutor<MasterProcedureEnv> procExec =
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
RegionInfo regionInfo = UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
|
||||||
|
RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo);
|
||||||
|
// just a dummy one
|
||||||
|
TransitRegionStateProcedure trsp =
|
||||||
|
TransitRegionStateProcedure.unassign(procExec.getEnvironment(), regionInfo);
|
||||||
|
long openSeqNum;
|
||||||
|
regionNode.lock();
|
||||||
|
try {
|
||||||
|
openSeqNum = regionNode.getOpenSeqNum();
|
||||||
|
// make a fake state to let the procedure wait.
|
||||||
|
regionNode.setState(State.OPENING);
|
||||||
|
regionNode.setOpenSeqNum(-1L);
|
||||||
|
regionNode.setProcedure(trsp);
|
||||||
|
} finally {
|
||||||
|
regionNode.unlock();
|
||||||
|
}
|
||||||
|
ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME);
|
||||||
|
procExec.submitProcedure(proc);
|
||||||
|
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
|
||||||
|
long oldTimeout = 0;
|
||||||
|
int timeoutIncrements = 0;
|
||||||
|
for (;;) {
|
||||||
|
long timeout = proc.getTimeout();
|
||||||
|
if (timeout > oldTimeout) {
|
||||||
|
LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
|
||||||
|
timeoutIncrements);
|
||||||
|
oldTimeout = timeout;
|
||||||
|
timeoutIncrements++;
|
||||||
|
if (timeoutIncrements > 3) {
|
||||||
|
// If we incremented at least twice, break; the backoff is working.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
regionNode.lock();
|
||||||
|
try {
|
||||||
|
// make a fake state to let the procedure wait.
|
||||||
|
regionNode.setState(State.OPEN);
|
||||||
|
regionNode.setOpenSeqNum(openSeqNum);
|
||||||
|
regionNode.unsetProcedure(trsp);
|
||||||
|
} finally {
|
||||||
|
regionNode.unlock();
|
||||||
|
}
|
||||||
|
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60000);
|
||||||
|
assertTrue(regionNode.getOpenSeqNum() > openSeqNum);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue