HBASE-21245 Add exponential backoff when retrying for sync replication related procedures

This commit is contained in:
Guanghao Zhang 2018-09-29 09:51:57 +08:00
parent fdbaa4c3f0
commit f9d51b67e4
5 changed files with 314 additions and 97 deletions

View File

@ -18,14 +18,20 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/** /**
* The base class for all replication peer related procedure. * The base class for all replication peer related procedure.
@ -39,6 +45,8 @@ public abstract class AbstractPeerProcedure<TState>
// used to keep compatible with old client where we can only returns after updateStorage. // used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch; protected ProcedurePrepareLatch latch;
protected int attempts;
protected AbstractPeerProcedure() { protected AbstractPeerProcedure() {
} }
@ -106,4 +114,26 @@ public abstract class AbstractPeerProcedure<TState>
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
} }
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
protected final ProcedureSuspendedException suspend(long backoff)
throws ProcedureSuspendedException {
attempts++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
// will be override in test to simulate error
@VisibleForTesting
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
} }

View File

@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/** /**
* The base class for all replication peer related procedure except sync replication state * The base class for all replication peer related procedure except sync replication state
@ -61,8 +60,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
// The sleep interval when waiting table to be enabled or disabled. // The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000; protected static final int SLEEP_INTERVAL_MS = 1000;
private int attemps;
protected ModifyPeerProcedure() { protected ModifyPeerProcedure() {
} }
@ -172,12 +169,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
} }
} }
// will be override in test to simulate error
@VisibleForTesting
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier, private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException { ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) { if (barrier >= 0) {
@ -248,21 +239,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
} }
} }
@Override
protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(this);
return false;
}
private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
attemps++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException { throws ProcedureSuspendedException {
@ -277,24 +253,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
releaseLatch(); releaseLatch();
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs", LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);
} }
attemps = 0; attempts = 0;
setNextState(PeerModificationState.UPDATE_PEER_STORAGE); setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE: case UPDATE_PEER_STORAGE:
try { try {
updatePeerStorage(env); updatePeerStorage(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(), LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e); peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);
} }
attemps = 0; attempts = 0;
setNextState(PeerModificationState.REFRESH_PEER_ON_RS); setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS: case REFRESH_PEER_ON_RS:
@ -305,24 +281,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
reopenRegions(env); reopenRegions(env);
} catch (Exception e) { } catch (Exception e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(), LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
peerId, backoff / 1000, e); peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);
} }
attemps = 0; attempts = 0;
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID); setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID: case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try { try {
updateLastPushedSequenceIdForSerialPeer(env); updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) { } catch (Exception e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs", LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);
} }
attemps = 0; attempts = 0;
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION); : PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -330,12 +306,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
enablePeer(env); enablePeer(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs", LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);
} }
attemps = 0; attempts = 0;
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS); setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS: case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@ -346,7 +322,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try { try {
postPeerModification(env); postPeerModification(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(attemps); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs", LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e); getClass().getName(), peerId, backoff / 1000, e);
throw suspend(backoff); throw suspend(backoff);

View File

@ -23,11 +23,9 @@ import java.util.List;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -38,14 +36,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S
@InterfaceAudience.Private @InterfaceAudience.Private
public class SyncReplicationReplayWALProcedure public class SyncReplicationReplayWALProcedure
extends StateMachineProcedure<MasterProcedureEnv, SyncReplicationReplayWALState> extends AbstractPeerProcedure<SyncReplicationReplayWALState> {
implements PeerProcedureInterface {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
private String peerId;
private ServerName worker = null; private ServerName worker = null;
private List<String> wals; private List<String> wals;
@ -58,9 +53,9 @@ public class SyncReplicationReplayWALProcedure
this.wals = wals; this.wals = wals;
} }
@Override protected Flow executeFromState(MasterProcedureEnv env, @Override
SyncReplicationReplayWALState state) protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { throws ProcedureSuspendedException {
SyncReplicationReplayWALManager syncReplicationReplayWALManager = SyncReplicationReplayWALManager syncReplicationReplayWALManager =
env.getMasterServices().getSyncReplicationReplayWALManager(); env.getMasterServices().getSyncReplicationReplayWALManager();
switch (state) { switch (state) {
@ -68,15 +63,19 @@ public class SyncReplicationReplayWALProcedure
try { try {
worker = syncReplicationReplayWALManager.getPeerWorker(peerId); worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
throw new ProcedureYieldException(); LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry",
wals, peerId, backoff / 1000, e);
throw suspend(backoff);
} }
if (worker == null) { if (worker == null) {
LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs and retry", wals,
} else { peerId, backoff / 1000);
setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); throw suspend(backoff);
} }
attempts = 0;
setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case DISPATCH_WALS_TO_WORKER: case DISPATCH_WALS_TO_WORKER:
addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker)); addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker));
@ -87,17 +86,23 @@ public class SyncReplicationReplayWALProcedure
try { try {
finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
} catch (IOException e) { } catch (IOException e) {
LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
throw new ProcedureYieldException(); LOG.warn("Failed to check whether replay wals {} finished for peer id={}" +
", sleep {} secs and retry",
wals, peerId, backoff / 1000, e);
throw suspend(backoff);
} }
try { try {
syncReplicationReplayWALManager.removePeerWorker(peerId, worker); syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.info("Failed to remove worker for peer id={}, retry", peerId); long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
throw new ProcedureYieldException(); LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker,
peerId, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
if (!finished) { if (!finished) {
LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId);
setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} }
@ -152,11 +157,6 @@ public class SyncReplicationReplayWALProcedure
data.getWalList().forEach(wals::add); data.getWalList().forEach(wals::add);
} }
@Override
public String getPeerId() {
return peerId;
}
@Override @Override
public PeerOperationType getPeerOperationType() { public PeerOperationType getPeerOperationType() {
return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -37,6 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
@ -50,7 +52,7 @@ public class TransitPeerSyncReplicationStateProcedure
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
private SyncReplicationState fromState; protected SyncReplicationState fromState;
private SyncReplicationState toState; private SyncReplicationState toState;
@ -107,7 +109,8 @@ public class TransitPeerSyncReplicationStateProcedure
return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION; return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
} }
private void preTransit(MasterProcedureEnv env) throws IOException { @VisibleForTesting
protected void preTransit(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState); cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
@ -139,14 +142,15 @@ public class TransitPeerSyncReplicationStateProcedure
} }
} }
private void reopenRegions(MasterProcedureEnv env) { @VisibleForTesting
protected void reopenRegions(MasterProcedureEnv env) {
addChildProcedure( addChildProcedure(
env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream() env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet().stream()
.map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new)); .map(ReopenTableRegionsProcedure::new).toArray(ReopenTableRegionsProcedure[]::new));
} }
private void createDirForRemoteWAL(MasterProcedureEnv env) @VisibleForTesting
throws ProcedureYieldException, IOException { protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterFileSystem(); MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
@ -155,8 +159,7 @@ public class TransitPeerSyncReplicationStateProcedure
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
remoteWALDirForPeer); remoteWALDirForPeer);
} else if (!walFs.mkdirs(remoteWALDirForPeer)) { } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); throw new IOException("Failed to create remote wal dir " + remoteWALDirForPeer);
throw new ProcedureYieldException();
} }
} }
@ -190,10 +193,33 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(new RecoverStandbyProcedure(peerId, serial)); addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
} }
@VisibleForTesting
protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
// disable the peer if we are going to transit to STANDBY state, as we need to remove
// all the pending replication files. If we do not disable the peer and delete the wal
// queues on zk directly, RS will get NoNode exception when updating the wal position
// and crash.
env.getReplicationPeerManager().disablePeer(peerId);
}
}
@VisibleForTesting
protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().removeAllQueues(peerId);
}
@VisibleForTesting
protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
}
@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state) PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) { switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try { try {
@ -209,19 +235,16 @@ public class TransitPeerSyncReplicationStateProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SET_PEER_NEW_SYNC_REPLICATION_STATE: case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try { try {
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState); setPeerNewSyncReplicationState(env);
if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
// disable the peer if we are going to transit to STANDBY state, as we need to remove
// all the pending replication files. If we do not disable the peer and delete the wal
// queues on zk directly, RS will get NoNode exception when updating the wal position
// and crash.
env.getReplicationPeerManager().disablePeer(peerId);
}
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
"replication peer state from {} to {}, retry", peerId, fromState, toState, e); LOG.warn(
throw new ProcedureYieldException(); "Failed to update peer storage for peer {} when starting transiting sync " +
"replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -238,12 +261,16 @@ public class TransitPeerSyncReplicationStateProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER: case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
try { try {
env.getReplicationPeerManager().removeAllQueues(peerId); removeAllReplicationQueues(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
" sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); LOG.warn(
throw new ProcedureYieldException(); "Failed to remove all replication queues peer {} when starting transiting" +
" sync replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
setNextState(fromState.equals(SyncReplicationState.ACTIVE) setNextState(fromState.equals(SyncReplicationState.ACTIVE)
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
@ -255,12 +282,16 @@ public class TransitPeerSyncReplicationStateProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE: case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
try { try {
env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); transitPeerSyncReplicationState(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " + long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
"replication peer state from {} to {}, retry", peerId, fromState, toState, e); LOG.warn(
throw new ProcedureYieldException(); "Failed to update peer storage for peer {} when ending transiting sync " +
"replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -272,12 +303,16 @@ public class TransitPeerSyncReplicationStateProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_SET_PEER_ENABLED: case SYNC_REPLICATION_SET_PEER_ENABLED:
try { try {
env.getReplicationPeerManager().enablePeer(peerId); enablePeer(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " + long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
"state from {} to {}, retry", peerId, fromState, toState, e); LOG.warn(
throw new ProcedureYieldException(); "Failed to set peer enabled for peer {} when transiting sync replication peer " +
"state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
@ -289,10 +324,14 @@ public class TransitPeerSyncReplicationStateProcedure
try { try {
createDirForRemoteWAL(env); createDirForRemoteWAL(env);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " + long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
"peer state from {} to {}, retry", peerId, fromState, toState, e); LOG.warn(
throw new ProcedureYieldException(); "Failed to create remote wal dir for peer {} when transiting sync replication " +
"peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
} }
attempts = 0;
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ProcedureTestUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, LargeTests.class })
public class TestTransitPeerSyncReplicationStateProcedureBackoff {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureBackoff.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static boolean FAIL = true;
public static class TestTransitPeerSyncReplicationStateProcedure
extends TransitPeerSyncReplicationStateProcedure {
public TestTransitPeerSyncReplicationStateProcedure() {
}
public TestTransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
super(peerId, state);
}
private void tryFail() throws ReplicationException {
synchronized (TestTransitPeerSyncReplicationStateProcedureBackoff.class) {
if (FAIL) {
throw new ReplicationException("Inject error");
}
FAIL = true;
}
}
@Override
protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
@SuppressWarnings("unchecked") T... subProcedure) {
// Make it a no-op
}
@Override
protected void preTransit(MasterProcedureEnv env) throws IOException {
fromState = SyncReplicationState.DOWNGRADE_ACTIVE;
}
@Override
protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
tryFail();
}
@Override
protected void removeAllReplicationQueues(MasterProcedureEnv env) throws ReplicationException {
tryFail();
}
@Override
protected void reopenRegions(MasterProcedureEnv env) {
// do nothing;
}
@Override
protected void transitPeerSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
tryFail();
}
@Override
protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
try {
tryFail();
} catch (ReplicationException e) {
throw new IOException(e);
}
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
private void assertBackoffIncrease() throws IOException, InterruptedException {
ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL,
TestTransitPeerSyncReplicationStateProcedure.class, 30000);
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL,
TestTransitPeerSyncReplicationStateProcedure.class, 2);
synchronized (TestTransitPeerSyncReplicationStateProcedure.class) {
FAIL = false;
}
UTIL.waitFor(30000, () -> FAIL);
}
@Test
public void testDowngradeActiveToActive() throws IOException, InterruptedException {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
// Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
long procId = procExec.submitProcedure(
new TestTransitPeerSyncReplicationStateProcedure("1", SyncReplicationState.ACTIVE));
// No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
// SET_PEER_NEW_SYNC_REPLICATION_STATE
assertBackoffIncrease();
// No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
// No retry for REOPEN_ALL_REGIONS_IN_PEER
// TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
assertBackoffIncrease();
// No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
// No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
UTIL.waitFor(30000, () -> procExec.isFinished(procId));
}
@Test
public void testDowngradeActiveToStandby() throws IOException, InterruptedException {
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
// Test procedure: DOWNGRADE_ACTIVE ==> ACTIVE
long procId = procExec.submitProcedure(
new TestTransitPeerSyncReplicationStateProcedure("2", SyncReplicationState.STANDBY));
// No retry for PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION
// SET_PEER_NEW_SYNC_REPLICATION_STATE
assertBackoffIncrease();
// No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN
// REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
assertBackoffIncrease();
// TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
assertBackoffIncrease();
// No retry for REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END
// CREATE_DIR_FOR_REMOTE_WAL
assertBackoffIncrease();
// No retry for POST_PEER_SYNC_REPLICATION_STATE_TRANSITION
UTIL.waitFor(30000, () -> procExec.isFinished(procId));
}
}