HBASE-22343 Make procedure retry interval configurable in test

This commit is contained in:
zhangduo 2019-05-03 22:23:59 +08:00 committed by Apache9
parent 68f14c19ff
commit e884a25f8d
13 changed files with 165 additions and 147 deletions

View File

@ -21,9 +21,13 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@ -335,20 +339,35 @@ public final class ProcedureUtil {
return builder.build(); return builder.build();
} }
public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
"hbase.procedure.retry.sleep.interval.ms";
// default to 1 second
public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
"hbase.procedure.retry.max.sleep.time.ms";
// default to 10 minutes
public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
TimeUnit.MINUTES.toMillis(10);
/** /**
* Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max * Get a retry counter for getting the backoff time. We will use the
* backoff time is 10 minutes. This is the general backoff policy for most procedure * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
* implementation. * is 10 minutes by default.
* <p/>
* For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
* {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
* timeout.
*/ */
public static long getBackoffTimeMs(int attempts) { public static RetryCounter createRetryCounter(Configuration conf) {
long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now. long sleepIntervalMs =
// avoid overflow conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
if (attempts >= 30) { long maxSleepTimeMs =
return maxBackoffTime; conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
} RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime); .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
// 1% possible jitter return new RetryCounter(retryConfig);
long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f);
return backoffTimeMs + jitter;
} }
} }

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -59,24 +57,6 @@ public class TestProcedureUtil {
assertEquals("Procedure protobuf does not match", proto1, proto2); assertEquals("Procedure protobuf does not match", proto1, proto2);
} }
@Test
public void testGetBackoffTimeMs() {
for (int i = 30; i < 1000; i++) {
assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
}
long backoffTimeMs = ProcedureUtil.getBackoffTimeMs(0);
assertTrue(backoffTimeMs >= 1000);
assertTrue(backoffTimeMs <= 1000 * 1.01f);
backoffTimeMs = ProcedureUtil.getBackoffTimeMs(1);
assertTrue(backoffTimeMs >= 2000);
assertTrue(backoffTimeMs <= 2000 * 1.01f);
backoffTimeMs = ProcedureUtil.getBackoffTimeMs(5);
assertTrue(backoffTimeMs >= 32000);
assertTrue(backoffTimeMs <= 32000 * 1.01f);
}
public static class TestProcedureNoDefaultConstructor extends TestProcedure { public static class TestProcedureNoDefaultConstructor extends TestProcedure {
public TestProcedureNoDefaultConstructor(int x) {} public TestProcedureNoDefaultConstructor(int x) {}
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,7 +69,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
private long seqId; private long seqId;
private int attempt; private RetryCounter retryCounter;
protected RegionRemoteProcedureBase() { protected RegionRemoteProcedureBase() {
} }
@ -268,7 +269,10 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
throw new IllegalStateException("Unknown state: " + state); throw new IllegalStateException("Unknown state: " + state);
} }
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e); LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
setTimeout(Math.toIntExact(backoff)); setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -115,7 +116,7 @@ public class TransitRegionStateProcedure
private boolean forceNewPlan; private boolean forceNewPlan;
private int attempt; private RetryCounter retryCounter;
private RegionRemoteProcedureBase remoteProc; private RegionRemoteProcedureBase remoteProc;
@ -210,7 +211,7 @@ public class TransitRegionStateProcedure
private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException { throws IOException {
if (regionNode.isInState(State.OPEN)) { if (regionNode.isInState(State.OPEN)) {
attempt = 0; retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are the last state, finish // we are the last state, finish
regionNode.unsetProcedure(this); regionNode.unsetProcedure(this);
@ -271,7 +272,7 @@ public class TransitRegionStateProcedure
private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode) private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException { throws IOException {
if (regionNode.isInState(State.CLOSED)) { if (regionNode.isInState(State.CLOSED)) {
attempt = 0; retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) { if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are the last state, finish // we are the last state, finish
regionNode.unsetProcedure(this); regionNode.unsetProcedure(this);
@ -300,7 +301,7 @@ public class TransitRegionStateProcedure
regionNode.unsetProcedure(this); regionNode.unsetProcedure(this);
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} }
attempt = 0; retryCounter = null;
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE); setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} }
@ -347,7 +348,10 @@ public class TransitRegionStateProcedure
throw new UnsupportedOperationException("unhandled state=" + state); throw new UnsupportedOperationException("unhandled state=" + state);
} }
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn( LOG.warn(
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " + "Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
"by other Procedure or operator intervention", "by other Procedure or operator intervention",

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,7 +52,7 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMe
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
private int attempts; private RetryCounter retryCounter;
@Override @Override
public TableName getTableName() { public TableName getTableName() {
@ -85,7 +86,10 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMe
insertNamespaceToMeta(env.getMasterServices().getConnection(), DEFAULT_NAMESPACE); insertNamespaceToMeta(env.getMasterServices().getConnection(), DEFAULT_NAMESPACE);
insertNamespaceToMeta(env.getMasterServices().getConnection(), SYSTEM_NAMESPACE); insertNamespaceToMeta(env.getMasterServices().getConnection(), SYSTEM_NAMESPACE);
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++); if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to init default and system namespaces, suspend {}secs", backoff, e); LOG.warn("Failed to init default and system namespaces, suspend {}secs", backoff, e);
setTimeout(Math.toIntExact(backoff)); setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,7 +52,7 @@ public class ReopenTableRegionsProcedure
private List<HRegionLocation> regions = Collections.emptyList(); private List<HRegionLocation> regions = Collections.emptyList();
private int attempt; private RetryCounter retryCounter;
public ReopenTableRegionsProcedure() { public ReopenTableRegionsProcedure() {
} }
@ -125,13 +126,16 @@ public class ReopenTableRegionsProcedure
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} }
if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) { if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
attempt = 0; retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} }
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again. // again.
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info( LOG.info(
"There are still {} region(s) which need to be reopened for table {} are in " + "There are still {} region(s) which need to be reopened for table {} are in " +
"OPENING state, suspend {}secs and try again later", "OPENING state, suspend {}secs and try again later",

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -53,7 +54,7 @@ public class SplitWALProcedure
private String walPath; private String walPath;
private ServerName worker; private ServerName worker;
private ServerName crashedServer; private ServerName crashedServer;
private int attempts = 0; private RetryCounter retryCounter;
public SplitWALProcedure() { public SplitWALProcedure() {
} }
@ -82,11 +83,16 @@ public class SplitWALProcedure
try { try {
finished = splitWALManager.isSplitWALFinished(walPath); finished = splitWALManager.isSplitWALFinished(walPath);
} catch (IOException ioe) { } catch (IOException ioe) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); if (retryCounter == null) {
LOG.warn( retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
"Failed to check whether splitting wal {} success, wait {} seconds to retry", }
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to check whether splitting wal {} success, wait {} seconds to retry",
walPath, backoff / 1000, ioe); walPath, backoff / 1000, ioe);
throw suspend(backoff); setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
} }
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
if (!finished) { if (!finished) {
@ -157,15 +163,6 @@ public class SplitWALProcedure
return false; return false;
} }
protected final ProcedureSuspendedException suspend(long backoff)
throws ProcedureSuspendedException {
attempts++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
public String getWAL() { public String getWAL() {
return walPath; return walPath;
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.quotas.RpcThrottleStorage; import org.apache.hadoop.hbase.quotas.RpcThrottleStorage;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,11 +44,11 @@ public class SwitchRpcThrottleProcedure
private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class); private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class);
RpcThrottleStorage rpcThrottleStorage; private RpcThrottleStorage rpcThrottleStorage;
boolean rpcThrottleEnabled; private boolean rpcThrottleEnabled;
ProcedurePrepareLatch syncLatch; private ProcedurePrepareLatch syncLatch;
ServerName serverName; private ServerName serverName;
int attempts; private RetryCounter retryCounter;
public SwitchRpcThrottleProcedure() { public SwitchRpcThrottleProcedure() {
} }
@ -68,7 +69,10 @@ public class SwitchRpcThrottleProcedure
try { try {
switchThrottleState(env, rpcThrottleEnabled); switchThrottleState(env, rpcThrottleEnabled);
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++); if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry", LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry",
rpcThrottleEnabled, backoff / 1000, e); rpcThrottleEnabled, backoff / 1000, e);
setTimeout(Math.toIntExact(backoff)); setTimeout(Math.toIntExact(backoff));

View File

@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData;
@ -38,7 +42,7 @@ public abstract class AbstractPeerNoLockProcedure<TState>
protected String peerId; protected String peerId;
protected int attempts; private RetryCounter retryCounter;
protected AbstractPeerNoLockProcedure() { protected AbstractPeerNoLockProcedure() {
} }
@ -87,12 +91,20 @@ public abstract class AbstractPeerNoLockProcedure<TState>
return false; return false;
} }
protected final ProcedureSuspendedException suspend(long backoff) protected final ProcedureSuspendedException suspend(Configuration conf,
throws ProcedureSuspendedException { LongConsumer backoffConsumer) throws ProcedureSuspendedException {
attempts++; if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(conf);
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff);
setTimeout(Math.toIntExact(backoff)); setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence(); skipPersistence();
throw new ProcedureSuspendedException(); throw new ProcedureSuspendedException();
} }
protected final void resetRetry() {
retryCounter = null;
}
} }

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -172,24 +171,22 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
releaseLatch(env); releaseLatch(env);
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(PeerModificationState.UPDATE_PEER_STORAGE); setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE: case UPDATE_PEER_STORAGE:
try { try {
updatePeerStorage(env); updatePeerStorage(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(), backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(PeerModificationState.REFRESH_PEER_ON_RS); setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS: case REFRESH_PEER_ON_RS:
@ -200,24 +197,22 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
reopenRegions(env); reopenRegions(env);
} catch (Exception e) { } catch (Exception e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(), backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs",
peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try { try {
updateLastPushedSequenceIdForSerialPeer(env); updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) { } catch (Exception e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION); : PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -225,12 +220,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
enablePeer(env); enablePeer(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@ -241,10 +235,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
postPeerModification(env); postPeerModification(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs", backoff -> LOG.warn(
getClass().getName(), peerId, backoff / 1000, e); "{} failed to call postPeerModification for peer {}, sleep {} secs",
throw suspend(backoff); getClass().getName(), peerId, backoff / 1000, e));
} catch (IOException e) { } catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " + LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e); "ignore since the procedure has already done", getClass().getName(), peerId, e);

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -75,10 +74,9 @@ public class SyncReplicationReplayWALProcedure
try { try {
finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + backoff -> LOG.warn("Failed to check whether replay wals {} finished for peer id={}" +
", sleep {} secs and retry", wals, peerId, backoff / 1000, e); ", sleep {} secs and retry", wals, peerId, backoff / 1000, e));
throw suspend(backoff);
} }
syncReplicationReplayWALManager.releasePeerWorker(peerId, worker, syncReplicationReplayWALManager.releasePeerWorker(peerId, worker,
env.getProcedureScheduler()); env.getProcedureScheduler());

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -255,14 +254,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
setPeerNewSyncReplicationState(env); setPeerNewSyncReplicationState(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to update peer storage for peer {} when starting transiting sync " + "Failed to update peer storage for peer {} when starting transiting sync " +
"replication peer state from {} to {}, sleep {} secs and retry", "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -287,13 +285,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get()); setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
} catch (Exception e) { } catch (Exception e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to update last pushed sequence id for peer {} when transiting sync " + "Failed to update last pushed sequence id for peer {} when transiting sync " +
"replication peer state from {} to {}, sleep {} secs and retry", "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
resetRetry();
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER: case REPLAY_REMOTE_WAL_IN_PEER:
@ -305,14 +303,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
removeAllReplicationQueues(env); removeAllReplicationQueues(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to remove all replication queues peer {} when starting transiting" + "Failed to remove all replication queues peer {} when starting transiting" +
" sync replication peer state from {} to {}, sleep {} secs and retry", " sync replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState(fromState.equals(SyncReplicationState.ACTIVE) setNextState(fromState.equals(SyncReplicationState.ACTIVE)
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
@ -321,14 +318,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
transitPeerSyncReplicationState(env); transitPeerSyncReplicationState(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to update peer storage for peer {} when ending transiting sync " + "Failed to update peer storage for peer {} when ending transiting sync " +
"replication peer state from {} to {}, sleep {} secs and retry", "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -342,14 +338,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
enablePeer(env); enablePeer(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to set peer enabled for peer {} when transiting sync replication peer " + "Failed to set peer enabled for peer {} when transiting sync replication peer " +
"state from {} to {}, sleep {} secs and retry", "state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -361,14 +356,13 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
createDirForRemoteWAL(env); createDirForRemoteWAL(env);
} catch (IOException e) { } catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts); throw suspend(env.getMasterConfiguration(),
LOG.warn( backoff -> LOG.warn(
"Failed to create remote wal dir for peer {} when transiting sync replication " + "Failed to create remote wal dir for peer {} when transiting sync replication " +
"peer state from {} to {}, sleep {} secs and retry", "peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e); peerId, fromState, toState, backoff / 1000, e));
throw suspend(backoff);
} }
attempts = 0; resetRetry();
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@ -144,6 +145,9 @@ public abstract class TestAssignmentManagerBase {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts());
// make retry for TRSP more frequent
conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
} }
@Before @Before