HBASE-21486 The current replication implementation for peer in STANDBY state breaks serial replication
This commit is contained in:
parent
dfeab9f5c9
commit
766aa1bfcc
|
@ -398,15 +398,16 @@ enum PeerSyncReplicationStateTransitionState {
|
|||
PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
|
||||
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
|
||||
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
|
||||
REPLAY_REMOTE_WAL_IN_PEER = 4;
|
||||
REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
|
||||
REOPEN_ALL_REGIONS_IN_PEER = 6;
|
||||
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
|
||||
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
|
||||
SYNC_REPLICATION_SET_PEER_ENABLED = 9;
|
||||
SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
|
||||
CREATE_DIR_FOR_REMOTE_WAL = 11;
|
||||
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
|
||||
REOPEN_ALL_REGIONS_IN_PEER = 4;
|
||||
SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER = 5;
|
||||
REPLAY_REMOTE_WAL_IN_PEER = 6;
|
||||
REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 7;
|
||||
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 8;
|
||||
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 9;
|
||||
SYNC_REPLICATION_SET_PEER_ENABLED = 10;
|
||||
SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 11;
|
||||
CREATE_DIR_FOR_REMOTE_WAL = 12;
|
||||
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 13;
|
||||
}
|
||||
|
||||
message PeerModificationStateData {
|
||||
|
|
|
@ -17,11 +17,27 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -29,8 +45,15 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
* The base class for all replication peer related procedure.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class AbstractPeerProcedure<TState>
|
||||
extends AbstractPeerNoLockProcedure<TState> implements PeerProcedureInterface {
|
||||
public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
|
||||
implements PeerProcedureInterface {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
|
||||
|
||||
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
|
||||
|
||||
// The sleep interval when waiting table to be enabled or disabled.
|
||||
protected static final int SLEEP_INTERVAL_MS = 1000;
|
||||
|
||||
// used to keep compatible with old client where we can only returns after updateStorage.
|
||||
protected ProcedurePrepareLatch latch;
|
||||
|
@ -75,4 +98,74 @@ public abstract class AbstractPeerProcedure<TState>
|
|||
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
|
||||
env.getReplicationPeerManager().enablePeer(peerId);
|
||||
}
|
||||
|
||||
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
|
||||
ReplicationQueueStorage queueStorage) throws ReplicationException {
|
||||
if (barrier >= 0) {
|
||||
lastSeqIds.put(encodedRegionName, barrier);
|
||||
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
|
||||
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
|
||||
lastSeqIds.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
|
||||
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
|
||||
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
|
||||
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
|
||||
if (!td.hasGlobalReplicationScope()) {
|
||||
continue;
|
||||
}
|
||||
TableName tn = td.getTableName();
|
||||
if (!ReplicationUtils.contains(peerConfig, tn)) {
|
||||
continue;
|
||||
}
|
||||
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
|
||||
}
|
||||
if (!lastSeqIds.isEmpty()) {
|
||||
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
|
||||
}
|
||||
}
|
||||
|
||||
// If the table is currently disabling, then we need to wait until it is disabled.We will write
|
||||
// replication barrier for a disabled table. And return whether we need to update the last pushed
|
||||
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
|
||||
// then we do not need to update last pushed sequence id for this table.
|
||||
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
|
||||
throws IOException {
|
||||
for (;;) {
|
||||
try {
|
||||
if (!tsm.getTableState(tn).isDisabling()) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(SLEEP_INTERVAL_MS);
|
||||
} catch (TableStateNotFoundException e) {
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
|
||||
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
|
||||
// should not forget to check whether the map is empty at last, if not you should call
|
||||
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
|
||||
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
|
||||
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
|
||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
|
||||
Connection conn = env.getMasterServices().getConnection();
|
||||
if (!needSetLastPushedSequenceId(tsm, tableName)) {
|
||||
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
|
||||
return;
|
||||
}
|
||||
for (Pair<String, Long> name2Barrier : MetaTableAccessor
|
||||
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
|
||||
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
|
||||
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
|
||||
queueStorage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.master.TableStateManager;
|
||||
|
@ -35,9 +31,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -55,11 +49,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
|
||||
|
||||
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
|
||||
|
||||
// The sleep interval when waiting table to be enabled or disabled.
|
||||
protected static final int SLEEP_INTERVAL_MS = 1000;
|
||||
|
||||
protected ModifyPeerProcedure() {
|
||||
}
|
||||
|
||||
|
@ -169,76 +158,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
|
|||
}
|
||||
}
|
||||
|
||||
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
|
||||
ReplicationQueueStorage queueStorage) throws ReplicationException {
|
||||
if (barrier >= 0) {
|
||||
lastSeqIds.put(encodedRegionName, barrier);
|
||||
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
|
||||
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
|
||||
lastSeqIds.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
|
||||
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
|
||||
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
|
||||
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
|
||||
if (!td.hasGlobalReplicationScope()) {
|
||||
continue;
|
||||
}
|
||||
TableName tn = td.getTableName();
|
||||
if (!ReplicationUtils.contains(peerConfig, tn)) {
|
||||
continue;
|
||||
}
|
||||
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
|
||||
}
|
||||
if (!lastSeqIds.isEmpty()) {
|
||||
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
|
||||
}
|
||||
}
|
||||
|
||||
// If the table is currently disabling, then we need to wait until it is disabled.We will write
|
||||
// replication barrier for a disabled table. And return whether we need to update the last pushed
|
||||
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
|
||||
// then we do not need to update last pushed sequence id for this table.
|
||||
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
|
||||
throws IOException {
|
||||
for (;;) {
|
||||
try {
|
||||
if (!tsm.getTableState(tn).isDisabling()) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(SLEEP_INTERVAL_MS);
|
||||
} catch (TableStateNotFoundException e) {
|
||||
return false;
|
||||
} catch (InterruptedException e) {
|
||||
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
|
||||
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
|
||||
// should not forget to check whether the map is empty at last, if not you should call
|
||||
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
|
||||
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
|
||||
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
|
||||
TableStateManager tsm = env.getMasterServices().getTableStateManager();
|
||||
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
|
||||
Connection conn = env.getMasterServices().getConnection();
|
||||
if (!needSetLastPushedSequenceId(tsm, tableName)) {
|
||||
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
|
||||
return;
|
||||
}
|
||||
for (Pair<String, Long> name2Barrier : MetaTableAccessor
|
||||
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
|
||||
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
|
||||
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
|
||||
queueStorage);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
|
||||
throws ProcedureSuspendedException, InterruptedException {
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
|
||||
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
|
||||
|
||||
protected SyncReplicationState fromState;
|
||||
|
||||
|
@ -58,6 +58,8 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
|
||||
private boolean enabled;
|
||||
|
||||
private boolean serial;
|
||||
|
||||
public TransitPeerSyncReplicationStateProcedure() {
|
||||
}
|
||||
|
||||
|
@ -75,8 +77,8 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.serializeStateData(serializer);
|
||||
TransitPeerSyncReplicationStateStateData.Builder builder =
|
||||
TransitPeerSyncReplicationStateStateData.newBuilder()
|
||||
.setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
|
||||
TransitPeerSyncReplicationStateStateData.newBuilder()
|
||||
.setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
|
||||
if (fromState != null) {
|
||||
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
|
||||
}
|
||||
|
@ -87,7 +89,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
super.deserializeStateData(serializer);
|
||||
TransitPeerSyncReplicationStateStateData data =
|
||||
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
|
||||
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
|
||||
toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
|
||||
if (data.hasFromState()) {
|
||||
fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
|
||||
|
@ -129,6 +131,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
}
|
||||
fromState = desc.getSyncReplicationState();
|
||||
enabled = desc.isEnabled();
|
||||
serial = desc.getPeerConfig().isSerial();
|
||||
}
|
||||
|
||||
private void postTransit(MasterProcedureEnv env) throws IOException {
|
||||
|
@ -174,7 +177,11 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
|
||||
} else {
|
||||
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
|
||||
// for serial peer, we need to reopen all the regions and then update the last pushed sequence
|
||||
// id, before replaying any remote wals, so that the serial replication will not be stuck, and
|
||||
// also guarantee the order when replicating the remote wal back.
|
||||
setNextState(serial ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
|
||||
: PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -183,6 +190,11 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
setNextState(
|
||||
enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
|
||||
: PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
|
||||
} else if (fromState == SyncReplicationState.STANDBY) {
|
||||
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
setNextState(serial && enabled
|
||||
? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
|
||||
: PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
|
||||
} else {
|
||||
setNextState(
|
||||
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
|
||||
|
@ -196,14 +208,20 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
@VisibleForTesting
|
||||
protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
|
||||
throws ReplicationException {
|
||||
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
|
||||
if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
|
||||
// disable the peer if we are going to transit to STANDBY state, as we need to remove
|
||||
if (toState.equals(SyncReplicationState.STANDBY) ||
|
||||
(fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) {
|
||||
// Disable the peer if we are going to transit to STANDBY state, as we need to remove
|
||||
// all the pending replication files. If we do not disable the peer and delete the wal
|
||||
// queues on zk directly, RS will get NoNode exception when updating the wal position
|
||||
// and crash.
|
||||
// Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
|
||||
// replication is serial, as we need to update the lastPushedSequence id after we reopen all
|
||||
// the regions, and for performance reason here we will update in batch, without using CAS, if
|
||||
// we are still replicating at RS side, we may accidentally update the last pushed sequence id
|
||||
// to a less value and cause the replication to be stuck.
|
||||
env.getReplicationPeerManager().disablePeer(peerId);
|
||||
}
|
||||
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -240,7 +258,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to update peer storage for peer {} when starting transiting sync " +
|
||||
"replication peer state from {} to {}, sleep {} secs and retry",
|
||||
"replication peer state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
|
@ -254,6 +272,30 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
.toArray(RefreshPeerProcedure[]::new));
|
||||
setNextStateAfterRefreshBegin();
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REOPEN_ALL_REGIONS_IN_PEER:
|
||||
reopenRegions(env);
|
||||
if (fromState.equals(SyncReplicationState.STANDBY)) {
|
||||
assert serial;
|
||||
setNextState(
|
||||
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
|
||||
} else {
|
||||
setNextState(
|
||||
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
|
||||
try {
|
||||
setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
|
||||
} catch (Exception e) {
|
||||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to update last pushed sequence id for peer {} when transiting sync " +
|
||||
"replication peer state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REPLAY_REMOTE_WAL_IN_PEER:
|
||||
replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
|
||||
setNextState(
|
||||
|
@ -266,7 +308,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to remove all replication queues peer {} when starting transiting" +
|
||||
" sync replication peer state from {} to {}, sleep {} secs and retry",
|
||||
" sync replication peer state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
|
@ -275,11 +317,6 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
|
||||
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REOPEN_ALL_REGIONS_IN_PEER:
|
||||
reopenRegions(env);
|
||||
setNextState(
|
||||
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
|
||||
try {
|
||||
transitPeerSyncReplicationState(env);
|
||||
|
@ -287,7 +324,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to update peer storage for peer {} when ending transiting sync " +
|
||||
"replication peer state from {} to {}, sleep {} secs and retry",
|
||||
"replication peer state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
|
@ -308,7 +345,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to set peer enabled for peer {} when transiting sync replication peer " +
|
||||
"state from {} to {}, sleep {} secs and retry",
|
||||
"state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
|
@ -327,7 +364,7 @@ public class TransitPeerSyncReplicationStateProcedure
|
|||
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
|
||||
LOG.warn(
|
||||
"Failed to create remote wal dir for peer {} when transiting sync replication " +
|
||||
"peer state from {} to {}, sleep {} secs and retry",
|
||||
"peer state from {} to {}, sleep {} secs and retry",
|
||||
peerId, fromState, toState, backoff / 1000, e);
|
||||
throw suspend(backoff);
|
||||
}
|
||||
|
|
|
@ -103,8 +103,8 @@ public class SyncReplicationTestBase {
|
|||
ZK_UTIL.startMiniZKCluster();
|
||||
initTestingUtility(UTIL1, "/cluster1");
|
||||
initTestingUtility(UTIL2, "/cluster2");
|
||||
StartMiniClusterOption option = StartMiniClusterOption.builder()
|
||||
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
|
||||
StartMiniClusterOption option =
|
||||
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
|
||||
UTIL1.startMiniCluster(option);
|
||||
UTIL2.startMiniCluster(option);
|
||||
TableDescriptor td =
|
||||
|
@ -217,16 +217,16 @@ public class SyncReplicationTestBase {
|
|||
return getRemoteWALDir(remoteWALDir, peerId);
|
||||
}
|
||||
|
||||
protected Path getRemoteWALDir(Path remoteWALDir, String peerId) {
|
||||
protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
|
||||
return new Path(remoteWALDir, peerId);
|
||||
}
|
||||
|
||||
protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
|
||||
protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
|
||||
return new Path(remoteWALDir, peerId + "-replay");
|
||||
}
|
||||
|
||||
protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
|
||||
throws Exception {
|
||||
protected final void verifyRemovedPeer(String peerId, Path remoteWALDir,
|
||||
HBaseTestingUtility utility) throws Exception {
|
||||
ReplicationPeerStorage rps = ReplicationStorageFactory
|
||||
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
|
||||
try {
|
||||
|
@ -247,7 +247,7 @@ public class SyncReplicationTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
|
||||
protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
|
||||
boolean expectedRejection) throws Exception {
|
||||
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
ClusterConnection connection = regionServer.getClusterConnection();
|
||||
|
@ -270,4 +270,20 @@ public class SyncReplicationTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final void waitUntilDeleted(HBaseTestingUtility util, Path remoteWAL) throws Exception {
|
||||
MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||
util.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return !mfs.getWALFileSystem().exists(remoteWAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return remoteWAL + " has not been deleted yet";
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.endsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.LogRoller;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
/**
|
||||
* Testcase to confirm that serial replication will not be stuck when using along with synchronous
|
||||
* replication. See HBASE-21486 for more details.
|
||||
*/
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestSerialSyncReplication extends SyncReplicationTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSerialSyncReplication.class);
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
// change to serial
|
||||
UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
|
||||
.newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
|
||||
UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
|
||||
.newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
|
||||
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.ACTIVE);
|
||||
|
||||
UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
|
||||
|
||||
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
|
||||
|
||||
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||
Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
|
||||
new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
|
||||
FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
|
||||
assertEquals(1, remoteWALStatus.length);
|
||||
Path remoteWAL = remoteWALStatus[0].getPath();
|
||||
assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
|
||||
// roll the wal writer, so that we will delete the remore wal. This is used to make sure that we
|
||||
// will not replay this wal when transiting to DA.
|
||||
for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
LogRoller roller = t.getRegionServer().getWalRoller();
|
||||
roller.requestRollAll();
|
||||
roller.waitUntilWalRollFinished();
|
||||
}
|
||||
waitUntilDeleted(UTIL2, remoteWAL);
|
||||
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
// let's reopen the region
|
||||
RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME));
|
||||
HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME));
|
||||
UTIL2.getAdmin().move(region.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(target.getServerName().getServerName()));
|
||||
// here we will remove all the pending wals. This is not a normal operation sequence but anyway,
|
||||
// user could do this.
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.STANDBY);
|
||||
// transit back to DA
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
SyncReplicationState.DOWNGRADE_ACTIVE);
|
||||
|
||||
UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
|
||||
// make sure that the async replication still works
|
||||
writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
|
||||
}
|
||||
}
|
|
@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -41,22 +40,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
|
||||
|
||||
private void waitUntilDeleted(Path remoteWAL) throws Exception {
|
||||
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||
UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return !mfs.getWALFileSystem().exists(remoteWAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return remoteWAL + " has not been deleted yet";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveRemoteWAL() throws Exception {
|
||||
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
|
||||
|
@ -76,7 +59,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
|
|||
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
rs.getWalRoller().requestRollAll();
|
||||
// The replicated wal file should be deleted finally
|
||||
waitUntilDeleted(remoteWAL);
|
||||
waitUntilDeleted(UTIL2, remoteWAL);
|
||||
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
|
||||
assertEquals(1, remoteWALStatus.length);
|
||||
remoteWAL = remoteWALStatus[0].getPath();
|
||||
|
@ -95,6 +78,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
|
|||
verifyThroughRegion(UTIL2, 100, 200);
|
||||
|
||||
// Confirm that we will also remove the remote wal files in DA state
|
||||
waitUntilDeleted(remoteWAL);
|
||||
waitUntilDeleted(UTIL2, remoteWAL);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue