From e884a25f8dd19f542ab5ea1ca9809095bd15dd0d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 3 May 2019 22:23:59 +0800 Subject: [PATCH] HBASE-22343 Make procedure retry interval configurable in test --- .../hbase/procedure2/ProcedureUtil.java | 47 +++++++---- .../hbase/procedure2/TestProcedureUtil.java | 20 ----- .../assignment/RegionRemoteProcedureBase.java | 8 +- .../TransitRegionStateProcedure.java | 14 ++-- .../master/procedure/InitMetaProcedure.java | 8 +- .../ReopenTableRegionsProcedure.java | 10 ++- .../master/procedure/SplitWALProcedure.java | 25 +++--- .../procedure/SwitchRpcThrottleProcedure.java | 16 ++-- .../AbstractPeerNoLockProcedure.java | 20 ++++- .../replication/ModifyPeerProcedure.java | 54 ++++++------- .../SyncReplicationReplayWALProcedure.java | 8 +- ...nsitPeerSyncReplicationStateProcedure.java | 78 +++++++++---------- .../assignment/TestAssignmentManagerBase.java | 4 + 13 files changed, 165 insertions(+), 147 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java index f25f2681f72..30201ca5b30 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java @@ -21,9 +21,13 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.NonceKey; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; +import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -335,20 +339,35 @@ public final class ProcedureUtil { return builder.build(); } + public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS = + "hbase.procedure.retry.sleep.interval.ms"; + + // default to 1 second + public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000; + + public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = + "hbase.procedure.retry.max.sleep.time.ms"; + + // default to 10 minutes + public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = + TimeUnit.MINUTES.toMillis(10); + /** - * Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max - * backoff time is 10 minutes. This is the general backoff policy for most procedure - * implementation. + * Get a retry counter for getting the backoff time. We will use the + * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time + * is 10 minutes by default. + *

+ * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and + * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not + * timeout. */ - public static long getBackoffTimeMs(int attempts) { - long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now. - // avoid overflow - if (attempts >= 30) { - return maxBackoffTime; - } - long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime); - // 1% possible jitter - long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f); - return backoffTimeMs + jitter; + public static RetryCounter createRetryCounter(Configuration conf) { + long sleepIntervalMs = + conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS); + long maxSleepTimeMs = + conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS); + RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs) + .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()); + return new RetryCounter(retryConfig); } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java index 3629fb74bc6..4d57c37ac61 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.procedure2; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -59,24 +57,6 @@ public class TestProcedureUtil { assertEquals("Procedure protobuf does not match", proto1, proto2); } - @Test - public void testGetBackoffTimeMs() { - for (int i = 30; i < 1000; i++) { - assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30)); - } - long backoffTimeMs = ProcedureUtil.getBackoffTimeMs(0); - assertTrue(backoffTimeMs >= 1000); - assertTrue(backoffTimeMs <= 1000 * 1.01f); - - backoffTimeMs = ProcedureUtil.getBackoffTimeMs(1); - assertTrue(backoffTimeMs >= 2000); - assertTrue(backoffTimeMs <= 2000 * 1.01f); - - backoffTimeMs = ProcedureUtil.getBackoffTimeMs(5); - assertTrue(backoffTimeMs >= 32000); - assertTrue(backoffTimeMs <= 32000 * 1.01f); - } - public static class TestProcedureNoDefaultConstructor extends TestProcedure { public TestProcedureNoDefaultConstructor(int x) {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java index 4a6f375d31c..9377d895d6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure regions = Collections.emptyList(); - private int attempt; + private RetryCounter retryCounter; public ReopenTableRegionsProcedure() { } @@ -125,13 +126,16 @@ public class ReopenTableRegionsProcedure return Flow.NO_MORE_STATE; } if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) { - attempt = 0; + retryCounter = null; setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); return Flow.HAS_MORE_STATE; } // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry // again. - long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++); + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); LOG.info( "There are still {} region(s) which need to be reopened for table {} are in " + "OPENING state, suspend {}secs and try again later", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java index 7e70dedba06..fd741b5473c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -53,7 +54,7 @@ public class SplitWALProcedure private String walPath; private ServerName worker; private ServerName crashedServer; - private int attempts = 0; + private RetryCounter retryCounter; public SplitWALProcedure() { } @@ -82,11 +83,16 @@ public class SplitWALProcedure try { finished = splitWALManager.isSplitWALFinished(walPath); } catch (IOException ioe) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); - LOG.warn( - "Failed to check whether splitting wal {} success, wait {} seconds to retry", + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("Failed to check whether splitting wal {} success, wait {} seconds to retry", walPath, backoff / 1000, ioe); - throw suspend(backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); } splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); if (!finished) { @@ -157,15 +163,6 @@ public class SplitWALProcedure return false; } - protected final ProcedureSuspendedException suspend(long backoff) - throws ProcedureSuspendedException { - attempts++; - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); - throw new ProcedureSuspendedException(); - } - public String getWAL() { return walPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java index 1b080b06731..4dd84ca4c1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.quotas.RpcThrottleStorage; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +44,11 @@ public class SwitchRpcThrottleProcedure private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class); - RpcThrottleStorage rpcThrottleStorage; - boolean rpcThrottleEnabled; - ProcedurePrepareLatch syncLatch; - ServerName serverName; - int attempts; + private RpcThrottleStorage rpcThrottleStorage; + private boolean rpcThrottleEnabled; + private ProcedurePrepareLatch syncLatch; + private ServerName serverName; + private RetryCounter retryCounter; public SwitchRpcThrottleProcedure() { } @@ -68,7 +69,10 @@ public class SwitchRpcThrottleProcedure try { switchThrottleState(env, rpcThrottleEnabled); } catch (IOException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++); + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry", rpcThrottleEnabled, backoff / 1000, e); setTimeout(Math.toIntExact(backoff)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java index 8f8e1e1921e..41b26bd02cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.function.LongConsumer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; @@ -38,7 +42,7 @@ public abstract class AbstractPeerNoLockProcedure protected String peerId; - protected int attempts; + private RetryCounter retryCounter; protected AbstractPeerNoLockProcedure() { } @@ -87,12 +91,20 @@ public abstract class AbstractPeerNoLockProcedure return false; } - protected final ProcedureSuspendedException suspend(long backoff) - throws ProcedureSuspendedException { - attempts++; + protected final ProcedureSuspendedException suspend(Configuration conf, + LongConsumer backoffConsumer) throws ProcedureSuspendedException { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(conf); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + backoffConsumer.accept(backoff); setTimeout(Math.toIntExact(backoff)); setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); skipPersistence(); throw new ProcedureSuspendedException(); } + + protected final void resetRetry() { + retryCounter = null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index d5d27796942..c4df6131f37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -172,24 +171,22 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(PeerModificationState.UPDATE_PEER_STORAGE); return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { updatePeerStorage(env); } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(), - peerId, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(PeerModificationState.REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; case REFRESH_PEER_ON_RS: @@ -200,24 +197,22 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); return Flow.HAS_MORE_STATE; case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: try { updateLastPushedSequenceIdForSerialPeer(env); } catch (Exception e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", - getClass().getName(), peerId, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED : PeerModificationState.POST_PEER_MODIFICATION); return Flow.HAS_MORE_STATE; @@ -225,12 +220,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: @@ -241,10 +235,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure LOG.warn( + "{} failed to call postPeerModification for peer {}, sleep {} secs", + getClass().getName(), peerId, backoff / 1000, e)); } catch (IOException e) { LOG.warn("{} failed to call post CP hook for peer {}, " + "ignore since the procedure has already done", getClass().getName(), peerId, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java index 19f7aea58c5..4858bd442c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,10 +74,9 @@ public class SyncReplicationReplayWALProcedure try { finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); } catch (IOException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + - ", sleep {} secs and retry", wals, peerId, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + + ", sleep {} secs and retry", wals, peerId, backoff / 1000, e)); } syncReplicationReplayWALManager.releasePeerWorker(peerId, worker, env.getProcedureScheduler()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index fcf41bee72f..358fd5e3492 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -255,14 +254,13 @@ public class TransitPeerSyncReplicationStateProcedure try { setPeerNewSyncReplicationState(env); } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to update peer storage for peer {} when starting transiting sync " + - "replication peer state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to update peer storage for peer {} when starting transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); return Flow.HAS_MORE_STATE; @@ -287,13 +285,13 @@ public class TransitPeerSyncReplicationStateProcedure try { setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get()); } catch (Exception e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to update last pushed sequence id for peer {} when transiting sync " + - "replication peer state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to update last pushed sequence id for peer {} when transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } + resetRetry(); setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER); return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: @@ -305,14 +303,13 @@ public class TransitPeerSyncReplicationStateProcedure try { removeAllReplicationQueues(env); } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to remove all replication queues peer {} when starting transiting" + - " sync replication peer state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState(fromState.equals(SyncReplicationState.ACTIVE) ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); @@ -321,14 +318,13 @@ public class TransitPeerSyncReplicationStateProcedure try { transitPeerSyncReplicationState(env); } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to update peer storage for peer {} when ending transiting sync " + - "replication peer state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to update peer storage for peer {} when ending transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); return Flow.HAS_MORE_STATE; @@ -342,14 +338,13 @@ public class TransitPeerSyncReplicationStateProcedure try { enablePeer(env); } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to set peer enabled for peer {} when transiting sync replication peer " + - "state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to set peer enabled for peer {} when transiting sync replication peer " + + "state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState( PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; @@ -361,14 +356,13 @@ public class TransitPeerSyncReplicationStateProcedure try { createDirForRemoteWAL(env); } catch (IOException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn( - "Failed to create remote wal dir for peer {} when transiting sync replication " + - "peer state from {} to {}, sleep {} secs and retry", - peerId, fromState, toState, backoff / 1000, e); - throw suspend(backoff); + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn( + "Failed to create remote wal dir for peer {} when transiting sync replication " + + "peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e)); } - attempts = 0; + resetRetry(); setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); return Flow.HAS_MORE_STATE; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java index 9f3acebd383..73b940aae67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -144,6 +145,9 @@ public abstract class TestAssignmentManagerBase { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); + // make retry for TRSP more frequent + conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); + conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); } @Before