HBASE-20426 Give up replicating anything in S state

This commit is contained in:
zhangduo 2018-05-03 15:51:35 +08:00
parent 5b6c0d2777
commit ae6c90b4ec
10 changed files with 380 additions and 63 deletions

View File

@ -396,11 +396,14 @@ enum PeerSyncReplicationStateTransitionState {
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
REPLAY_REMOTE_WAL_IN_PEER = 4;
REOPEN_ALL_REGIONS_IN_PEER = 5;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
CREATE_DIR_FOR_REMOTE_WAL = 8;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9;
REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
REOPEN_ALL_REGIONS_IN_PEER = 6;
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
SYNC_REPLICATION_SET_PEER_ENABLED = 9;
SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
CREATE_DIR_FOR_REMOTE_WAL = 11;
POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
}
message PeerModificationStateData {

View File

@ -106,4 +106,8 @@ public abstract class AbstractPeerProcedure<TState>
throw new UnsupportedOperationException();
}
protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
}
}

View File

@ -109,12 +109,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException();
}
private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, type, sn))
.toArray(RefreshPeerProcedure[]::new));
}
protected ReplicationPeerConfig getOldPeerConfig() {
return null;
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@ -192,9 +193,9 @@ public class ReplicationPeerManager {
}
/**
* @return the old state.
* @return the old state, and whether the peer is enabled.
*/
public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
@ -203,7 +204,7 @@ public class ReplicationPeerManager {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId);
}
return fromState;
return Pair.newPair(fromState, desc.isEnabled());
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@ -303,7 +304,7 @@ public class ReplicationPeerManager {
}
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
public void removeAllQueues(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@ -317,6 +318,10 @@ public class ReplicationPeerManager {
// unless it has already been removed by others.
ReplicationUtils.removeAllQueues(queueStorage, peerId);
ReplicationUtils.removeAllQueues(queueStorage, peerId);
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
removeAllQueues(peerId);
queueStorage.removePeerFromHFileRefs(peerId);
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,6 +55,8 @@ public class TransitPeerSyncReplicationStateProcedure
private SyncReplicationState toState;
private boolean enabled;
public TransitPeerSyncReplicationStateProcedure() {
}
@ -110,7 +113,10 @@ public class TransitPeerSyncReplicationStateProcedure
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
}
fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
Pair<SyncReplicationState, Boolean> pair =
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
fromState = pair.getFirst();
enabled = pair.getSecond();
}
private void postTransit(MasterProcedureEnv env) throws IOException {
@ -131,6 +137,21 @@ public class TransitPeerSyncReplicationStateProcedure
.collect(Collectors.toList());
}
private void createDirForRemoteWAL(MasterProcedureEnv env)
throws ProcedureYieldException, IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
remoteWALDirForPeer);
} else if (!walFs.mkdirs(remoteWALDirForPeer)) {
LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
throw new ProcedureYieldException();
}
}
@Override
protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state)
@ -151,6 +172,13 @@ public class TransitPeerSyncReplicationStateProcedure
case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try {
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
// disable the peer if we are going to transit to STANDBY state, as we need to remove
// all the pending replication files. If we do not disable the peer and delete the wal
// queues on zk directly, RS will get NoNode exception when updating the wal position
// and crash.
env.getReplicationPeerManager().disablePeer(peerId);
}
} catch (ReplicationException e) {
LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " +
"replication peer state from {} to {}, retry", peerId, fromState, toState, e);
@ -163,16 +191,35 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new));
if (fromState == SyncReplicationState.STANDBY &&
toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
if (fromState.equals(SyncReplicationState.ACTIVE)) {
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
setNextState(toState.equals(SyncReplicationState.STANDBY)
? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else {
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
}
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
addChildProcedure(new RecoverStandbyProcedure(peerId));
setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
try {
env.getReplicationPeerManager().removeAllQueues(peerId);
} catch (ReplicationException e) {
LOG.warn("Failed to remove all replication queues peer {} when starting transiting" +
" sync replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(fromState.equals(SyncReplicationState.ACTIVE)
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case REOPEN_ALL_REGIONS_IN_PEER:
try {
@ -202,27 +249,35 @@ public class TransitPeerSyncReplicationStateProcedure
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
if (toState == SyncReplicationState.STANDBY) {
setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
setNextState(
enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
: PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
}
return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL:
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
case SYNC_REPLICATION_SET_PEER_ENABLED:
try {
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
remoteWALDirForPeer);
} else if (!walFs.mkdirs(remoteWALDirForPeer)) {
LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
throw new ProcedureYieldException();
}
env.getReplicationPeerManager().enablePeer(peerId);
} catch (ReplicationException e) {
LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " +
"state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
refreshPeer(env, PeerOperationType.ENABLE);
setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL:
try {
createDirForRemoteWAL(env);
} catch (IOException e) {
LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e);
LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " +
"peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(
@ -242,5 +297,4 @@ public class TransitPeerSyncReplicationStateProcedure
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
}

View File

@ -244,10 +244,8 @@ public class LogRoller extends HasThread implements Closeable {
}
/**
* For testing only
* @return true if all WAL roll finished
*/
@VisibleForTesting
public boolean walRollFinished() {
for (boolean needRoll : walNeedsRoll.values()) {
if (needRoll) {
@ -257,6 +255,15 @@ public class LogRoller extends HasThread implements Closeable {
return true;
}
/**
* Wait until all wals have been rolled after calling {@link #requestRollAll()}.
*/
public void waitUntilWalRollFinished() throws InterruptedException {
while (!walRollFinished()) {
Thread.sleep(100);
}
}
@Override
public void close() {
running = false;

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -154,24 +156,65 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
if (!peer.getPeerConfig().isSyncReplication()) {
throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
}
SyncReplicationState newState = peer.getNewSyncReplicationState();
SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
if (stage == 0) {
if (newState != SyncReplicationState.NONE) {
if (newSyncReplicationState != SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
"this should be a retry, give up", peerId, newState);
"this should be a retry, give up", peerId, newSyncReplicationState);
return;
}
newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
SyncReplicationState oldState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
// refresh the peer state first, as when we transit to STANDBY, we may need to disable the
// peer before processing the sync replication state.
PeerState oldState = peer.getPeerState();
boolean success = false;
try {
PeerState newState = replicationPeers.refreshPeerState(peerId);
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
} finally {
if (!success) {
peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
}
newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
newSyncReplicationState, stage);
} else {
if (newState == SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been clear, and the " +
"current state is {}, this should be a retry, give up", peerId, newState);
if (newSyncReplicationState == SyncReplicationState.NONE) {
LOG.warn(
"The new sync replication state for peer {} has already been clear, and the " +
"current state is {}, this should be a retry, give up",
peerId, newSyncReplicationState);
return;
}
if (newSyncReplicationState == SyncReplicationState.STANDBY) {
replicationSourceManager.drainSources(peerId);
// Need to roll the wals and make the ReplicationSource for this peer track the new file.
// If we do not do this, there will be two problems that can not be addressed at the same
// time. First, if we just throw away the current wal file, and later when we transit the
// peer to DA, and the wal has not been rolled yet, then the new data written to the wal
// file will not be replicated and cause data inconsistency. But if we just track the
// current wal file without rolling, it may contains some data before we transit the peer
// to S, later if we transit the peer to DA, the data will also be replicated and cause
// data inconsistency. So here we need to roll the wal, and let the ReplicationSource
// track the new wal file, and throw the old wal files away.
LogRoller roller = rs.getWalRoller();
roller.requestRollAll();
try {
roller.waitUntilWalRollFinished();
} catch (InterruptedException e) {
// reset the interrupted flag
Thread.currentThread().interrupt();
throw (IOException) new InterruptedIOException(
"Interrupted while waiting for wal roll finish").initCause(e);
}
}
SyncReplicationState oldState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
stage);
peer.transitSyncReplicationState();
}
} finally {

View File

@ -520,6 +520,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
metrics.clear();
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@ -391,12 +392,84 @@ public class ReplicationSourceManager implements ReplicationListener {
return src;
}
/**
* <p>
* This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}.
* </p>
* <p>
* When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal
* files for a replication peer as we do not need to replicate them any more. And this is
* necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE}
* later, the stale data will be replicated again and cause inconsistency.
* </p>
* <p>
* See HBASE-20426 for more details.
* </p>
* @param peerId the id of the sync replication peer
*/
public void drainSources(String peerId) throws IOException, ReplicationException {
String terminateMessage = "Sync replication peer " + peerId +
" is transiting to STANDBY. Will close the previous replication source and open a new one";
ReplicationPeer peer = replicationPeers.getPeer(peerId);
assert peer.getPeerConfig().isSyncReplication();
ReplicationSourceInterface src = createSource(peerId, peer);
// synchronized here to avoid race with preLogRoll where we add new log to source and also
// walsById.
ReplicationSourceInterface toRemove;
Map<String, NavigableSet<String>> wals = new HashMap<>();
synchronized (latestPaths) {
toRemove = sources.put(peerId, src);
if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
toRemove.getSourceMetrics().clear();
}
// Here we make a copy of all the remaining wal files and then delete them from the
// replication queue storage after releasing the lock. It is not safe to just remove the old
// map from walsById since later we may fail to delete them from the replication queue
// storage, and when we retry next time, we can not know the wal files that need to be deleted
// from the replication queue storage.
walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
}
LOG.info("Startup replication source for " + src.getPeerId());
src.startup();
for (NavigableSet<String> walsByGroup : wals.values()) {
for (String wal : walsByGroup) {
queueStorage.removeWAL(server.getServerName(), peerId, wal);
}
}
synchronized (walsById) {
Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
wals.forEach((k, v) -> {
NavigableSet<String> walsByGroup = oldWals.get(k);
if (walsByGroup != null) {
walsByGroup.removeAll(v);
}
});
}
// synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
// a background task, we will delete the file from replication queue storage under the lock to
// simplify the logic.
synchronized (this.oldsources) {
for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) {
ReplicationSourceInterface oldSource = iter.next();
if (oldSource.getPeerId().equals(peerId)) {
String queueId = oldSource.getQueueId();
oldSource.terminate(terminateMessage);
oldSource.getSourceMetrics().clear();
queueStorage.removeQueue(server.getServerName(), queueId);
walsByIdRecoveredQueues.remove(queueId);
iter.remove();
}
}
}
}
/**
* Close the previous replication sources of this peer id and open new sources to trigger the new
* replication state changes or new replication config changes. Here we don't need to change
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
* @throws IOException
*/
public void refreshSources(String peerId) throws IOException {
String terminateMessage = "Peer " + peerId +
@ -410,7 +483,7 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
}
for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
}
@ -850,18 +923,6 @@ public class ReplicationSourceManager implements ReplicationListener {
actualPeerId);
continue;
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
@ -869,9 +930,36 @@ public class ReplicationSourceManager implements ReplicationListener {
peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
src.terminate("Recovered queue doesn't belong to any current peer");
removeRecoveredSource(src);
deleteQueue(queueId);
continue;
}
// Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
// transiting to STANDBY state. The only exception is we are in STANDBY state and
// transiting to DA, under this state we will replay the remote WAL and they need to be
// replicated back.
if (peer.getPeerConfig().isSyncReplication()) {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
peer.getSyncReplicationStateAndNewState();
if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) &&
stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) ||
stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
src.terminate("Sync replication peer is in STANDBY state");
deleteQueue(queueId);
continue;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(queueId, walsByGroup);
for (String wal : walsSet) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDrainReplicationQueuesForStandBy.class);
@Test
public void test() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 0, 100);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(
((AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()))
.getCurrentFileName().getName());
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// transit cluster2 to DA and cluster 1 to S
verify(UTIL2, 0, 100);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
// delete the original value, and then major compact
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
for (int i = 0; i < 100; i++) {
table.delete(new Delete(Bytes.toBytes(i)));
}
}
UTIL2.flush(TABLE_NAME);
UTIL2.compact(TABLE_NAME, true);
// wait until the new values are replicated back to cluster1
HRegion region = rs.getRegions(TABLE_NAME).get(0);
UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return region.get(new Get(Bytes.toBytes(99))).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return "Replication has not been catched up yet";
}
});
// transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to
// cluster2
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
// confirm that we will not replicate the old data which causes inconsistency
ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
.getReplicationManager().getSource(PEER_ID);
UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !source.workerThreads.containsKey(walGroupId);
}
@Override
public String explainFailure() throws Exception {
return "Replication has not been catched up yet";
}
});
HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
for (int i = 0; i < 100; i++) {
assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty());
}
}
}