diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index e133a6579f9..d3a4eb8638b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -18,14 +18,20 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; + import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure. @@ -39,6 +45,8 @@ public abstract class AbstractPeerProcedure // used to keep compatible with old client where we can only returns after updateStorage. protected ProcedurePrepareLatch latch; + protected int attempts; + protected AbstractPeerProcedure() { } @@ -106,4 +114,26 @@ public abstract class AbstractPeerProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected final ProcedureSuspendedException suspend(long backoff) + throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + // will be override in test to simulate error + @VisibleForTesting + protected void enablePeer(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().enablePeer(peerId); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 7690c962550..1aa86ede5e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure except sync replication state @@ -61,8 +60,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure lastSeqIds, String encodedRegionName, long barrier, ReplicationQueueStorage queueStorage) throws ReplicationException { if (barrier >= 0) { @@ -248,21 +239,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure - implements PeerProcedureInterface { + extends AbstractPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); - private String peerId; - private ServerName worker = null; private List wals; @@ -58,9 +53,9 @@ public class SyncReplicationReplayWALProcedure this.wals = wals; } - @Override protected Flow executeFromState(MasterProcedureEnv env, - SyncReplicationReplayWALState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + @Override + protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state) + throws ProcedureSuspendedException { SyncReplicationReplayWALManager syncReplicationReplayWALManager = env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { @@ -68,15 +63,19 @@ public class SyncReplicationReplayWALProcedure try { worker = syncReplicationReplayWALManager.getPeerWorker(peerId); } catch (ReplicationException e) { - LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } if (worker == null) { - LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); - setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); - } else { - setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs and retry", wals, + peerId, backoff / 1000); + throw suspend(backoff); } + attempts = 0; + setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); return Flow.HAS_MORE_STATE; case DISPATCH_WALS_TO_WORKER: addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker)); @@ -87,17 +86,23 @@ public class SyncReplicationReplayWALProcedure try { finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); } catch (IOException e) { - LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + + ", sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } try { syncReplicationReplayWALManager.removePeerWorker(peerId, worker); } catch (ReplicationException e) { - LOG.info("Failed to remove worker for peer id={}, retry", peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker, + peerId, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; if (!finished) { - LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); + LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); return Flow.HAS_MORE_STATE; } @@ -152,11 +157,6 @@ public class SyncReplicationReplayWALProcedure data.getWalList().forEach(wals::add); } - @Override - public String getPeerId() { - return peerId; - } - @Override public PeerOperationType getPeerOperationType() { return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index c650974df8f..8c6232f48c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -37,6 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; @@ -50,7 +52,7 @@ public class TransitPeerSyncReplicationStateProcedure private static final Logger LOG = LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); - private SyncReplicationState fromState; + protected SyncReplicationState fromState; private SyncReplicationState toState; @@ -107,7 +109,8 @@ public class TransitPeerSyncReplicationStateProcedure return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; } - private void preTransit(MasterProcedureEnv env) throws IOException { + @VisibleForTesting + protected void preTransit(MasterProcedureEnv env) throws IOException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); @@ -139,14 +142,15 @@ public class TransitPeerSyncReplicationStateProcedure } } - private void reopenRegions(MasterProcedureEnv env) { + @VisibleForTesting + protected void reopenRegions(MasterProcedureEnv env) { addChildProcedure( env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream() .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new)); } - private void createDirForRemoteWAL(MasterProcedureEnv env) - throws ProcedureYieldException, IOException { + @VisibleForTesting + protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException { MasterFileSystem mfs = env.getMasterFileSystem(); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); @@ -155,8 +159,7 @@ public class TransitPeerSyncReplicationStateProcedure LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", remoteWALDirForPeer); } else if (!walFs.mkdirs(remoteWALDirForPeer)) { - LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); - throw new ProcedureYieldException(); + throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer); } } @@ -190,10 +193,33 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(new RecoverStandbyProcedure(peerId, serial)); } + @VisibleForTesting + protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) + throws ReplicationException { + env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); + if (toState.equals(SyncReplicationState.STANDBY) && enabled) { + // disable the peer if we are going to transit to STANDBY state, as we need to remove + // all the pending replication files. If we do not disable the peer and delete the wal + // queues on zk directly, RS will get NoNode exception when updating the wal position + // and crash. + env.getReplicationPeerManager().disablePeer(peerId); + } + } + + @VisibleForTesting + protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().removeAllQueues(peerId); + } + + @VisibleForTesting + protected void transitPeerSyncReplicationState(MasterProcedureEnv env) + throws ReplicationException { + env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, - PeerSyncReplicationStateTransitionState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException { switch (state) { case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { @@ -209,19 +235,16 @@ public class TransitPeerSyncReplicationStateProcedure return Flow.HAS_MORE_STATE; case SET_PEER_NEW_SYNC_REPLICATION_STATE: try { - env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); - if (toState.equals(SyncReplicationState.STANDBY) && enabled) { - // disable the peer if we are going to transit to STANDBY state, as we need to remove - // all the pending replication files. If we do not disable the peer and delete the wal - // queues on zk directly, RS will get NoNode exception when updating the wal position - // and crash. - env.getReplicationPeerManager().disablePeer(peerId); - } + setPeerNewSyncReplicationState(env); } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when starting transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); return Flow.HAS_MORE_STATE; @@ -238,12 +261,16 @@ public class TransitPeerSyncReplicationStateProcedure return Flow.HAS_MORE_STATE; case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: try { - env.getReplicationPeerManager().removeAllQueues(peerId); + removeAllReplicationQueues(env); } catch (ReplicationException e) { - LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + - " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState(fromState.equals(SyncReplicationState.ACTIVE) ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); @@ -255,12 +282,16 @@ public class TransitPeerSyncReplicationStateProcedure return Flow.HAS_MORE_STATE; case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: try { - env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); + transitPeerSyncReplicationState(env); } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when ending transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); return Flow.HAS_MORE_STATE; @@ -272,12 +303,16 @@ public class TransitPeerSyncReplicationStateProcedure return Flow.HAS_MORE_STATE; case SYNC_REPLICATION_SET_PEER_ENABLED: try { - env.getReplicationPeerManager().enablePeer(peerId); + enablePeer(env); } catch (ReplicationException e) { - LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " + - "state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to set peer enabled for peer {} when transiting sync replication peer " + + "state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; @@ -289,10 +324,14 @@ public class TransitPeerSyncReplicationStateProcedure try { createDirForRemoteWAL(env); } catch (IOException e) { - LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " + - "peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to create remote wal dir for peer {} when transiting sync replication " + + "peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); return Flow.HAS_MORE_STATE; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java new file mode 100644 index 00000000000..0526a8e8da3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureBackoff.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ProcedureTestUtil; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestTransitPeerSyncReplicationStateProcedureBackoff { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureBackoff.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static boolean FAIL = true; + + public static class TestTransitPeerSyncReplicationStateProcedure + extends TransitPeerSyncReplicationStateProcedure { + + public TestTransitPeerSyncReplicationStateProcedure() { + } + + public TestTransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) { + super(peerId, state); + } + + private void tryFail() throws ReplicationException { + synchronized (TestTransitPeerSyncReplicationStateProcedureBackoff.class) { + if (FAIL) { + throw new ReplicationException("Inject error"); + } + FAIL = true; + } + } + + @Override + protected > void addChildProcedure( + @SuppressWarnings("unchecked") T... subProcedure) { + // Make it a no-op + } + + @Override + protected void preTransit(MasterProcedureEnv env) throws IOException { + fromState = SyncReplicationState.DOWNGRADE_ACTIVE; + } + + @Override + protected void setPeerNewSyncReplicationState(MasterProcedureEnv env) + throws ReplicationException { + tryFail(); + } + + @Override + protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException { + tryFail(); + } + + @Override + protected void reopenRegions(MasterProcedureEnv env) { + // do nothing; + } + + @Override + protected void transitPeerSyncReplicationState(MasterProcedureEnv env) + throws ReplicationException { + tryFail(); + } + + @Override + protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException { + try { + tryFail(); + } catch (ReplicationException e) { + throw new IOException(e); + } + } + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void assertBackoffIncrease() throws IOException, InterruptedException { + ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, + TestTransitPeerSyncReplicationStateProcedure.class, 30000); + ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, + TestTransitPeerSyncReplicationStateProcedure.class, 2); + synchronized (TestTransitPeerSyncReplicationStateProcedure.class) { + FAIL = false; + } + UTIL.waitFor(30000, () -> FAIL); + } + + @Test + public void testDowngradeActiveToActive() throws IOException, InterruptedException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE + long procId = procExec.submitProcedure( + new TestTransitPeerSyncReplicationStateProcedure("1", SyncReplicationState.ACTIVE)); + // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION + // SET_PEER_NEW_SYNC_REPLICATION_STATE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN + // No retry for REOPEN_ALL_REGIONS_IN_PEER + // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END + // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION + UTIL.waitFor(30000, () -> procExec.isFinished(procId)); + } + + @Test + public void testDowngradeActiveToStandby() throws IOException, InterruptedException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + // Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE + long procId = procExec.submitProcedure( + new TestTransitPeerSyncReplicationStateProcedure("2", SyncReplicationState.STANDBY)); + // No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION + // SET_PEER_NEW_SYNC_REPLICATION_STATE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN + // REMOVE_ALL_REPLICATION_QUEUES_IN_PEER + assertBackoffIncrease(); + // TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE + assertBackoffIncrease(); + // No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END + // CREATE_DIR_FOR_REMOTE_WAL + assertBackoffIncrease(); + // No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION + UTIL.waitFor(30000, () -> procExec.isFinished(procId)); + } +} \ No newline at end of file