HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test - addendum

This commit is contained in:
huzheng 2018-04-27 16:40:53 +08:00
parent 39cf42be9a
commit e9a278adc6
2 changed files with 27 additions and 6 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -103,6 +104,10 @@ public class ReplicationPeers {
return Collections.unmodifiableSet(peerCache.keySet()); return Collections.unmodifiableSet(peerCache.keySet());
} }
public Map<String, ReplicationPeerImpl> getPeerCache() {
return Collections.unmodifiableMap(peerCache);
}
public PeerState refreshPeerState(String peerId) throws ReplicationException { public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId); ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) { if (peer == null) {

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -627,11 +628,25 @@ public class ReplicationSourceManager implements ReplicationListener {
class NodeFailoverWorker extends Thread { class NodeFailoverWorker extends Thread {
private final ServerName deadRS; private final ServerName deadRS;
// After claim the queues from dead region server, the NodeFailoverWorker will skip to start
// the RecoveredReplicationSource if the peer has been removed. but there's possible that
// remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
// NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
// should start the RecoveredReplicationSource. If the latest peer is not the old peer when
// NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
// the rs will abort (See HBASE-20475).
private final Map<String, ReplicationPeerImpl> peersSnapshot;
@VisibleForTesting @VisibleForTesting
public NodeFailoverWorker(ServerName deadRS) { public NodeFailoverWorker(ServerName deadRS) {
super("Failover-for-" + deadRS); super("Failover-for-" + deadRS);
this.deadRS = deadRS; this.deadRS = deadRS;
peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
}
private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
return oldPeerRef != null && oldPeerRef == newPeerRef;
} }
@Override @Override
@ -691,16 +706,16 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
String actualPeerId = replicationQueueInfo.getPeerId(); String actualPeerId = replicationQueueInfo.getPeerId();
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId); ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
if (peer == null) { if (peer == null || !isOldPeer(actualPeerId, peer)) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS + LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
", peer is null"); deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
continue; continue;
} }
if (server instanceof ReplicationSyncUp.DummyServer if (server instanceof ReplicationSyncUp.DummyServer
&& peer.getPeerState().equals(PeerState.DISABLED)) { && peer.getPeerState().equals(PeerState.DISABLED)) {
LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip " LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
+ "replicating data to this peer.", + "replicating data to this peer.",
actualPeerId); actualPeerId);
continue; continue;
@ -721,7 +736,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src = createSource(queueId, peer); ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) { synchronized (oldsources) {
if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) { peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
src.terminate("Recovered queue doesn't belong to any current peer"); src.terminate("Recovered queue doesn't belong to any current peer");
removeRecoveredSource(src); removeRecoveredSource(src);
continue; continue;