HBASE-19923 Reset peer state and config when refresh replication source failed
This commit is contained in:
parent
05279d2f5f
commit
2b63af376e
|
@ -54,11 +54,11 @@ public class ReplicationPeerImpl implements ReplicationPeer {
|
|||
this.peerConfigListeners = new ArrayList<>();
|
||||
}
|
||||
|
||||
void setPeerState(boolean enabled) {
|
||||
public void setPeerState(boolean enabled) {
|
||||
this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
|
||||
}
|
||||
|
||||
void setPeerConfig(ReplicationPeerConfig peerConfig) {
|
||||
public void setPeerConfig(ReplicationPeerConfig peerConfig) {
|
||||
this.peerConfig = peerConfig;
|
||||
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
|
||||
}
|
||||
|
|
|
@ -62,18 +62,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
|
|||
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
|
||||
PeerState newState;
|
||||
Lock peerLock = peersLock.acquireLock(peerId);
|
||||
ReplicationPeerImpl peer = null;
|
||||
PeerState oldState = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
|
||||
peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
|
||||
if (peer == null) {
|
||||
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
|
||||
}
|
||||
PeerState oldState = peer.getPeerState();
|
||||
oldState = peer.getPeerState();
|
||||
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
|
||||
// RS need to start work with the new replication state change
|
||||
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
|
||||
replicationSourceManager.refreshSources(peerId);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success && peer != null) {
|
||||
// Reset peer state if refresh source failed
|
||||
peer.setPeerState(oldState.equals(PeerState.ENABLED));
|
||||
}
|
||||
peerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
@ -91,19 +99,27 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
|
|||
@Override
|
||||
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
|
||||
Lock peerLock = peersLock.acquireLock(peerId);
|
||||
ReplicationPeerImpl peer = null;
|
||||
ReplicationPeerConfig oldConfig = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
|
||||
peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
|
||||
if (peer == null) {
|
||||
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
|
||||
}
|
||||
ReplicationPeerConfig oldConfig = peer.getPeerConfig();
|
||||
oldConfig = peer.getPeerConfig();
|
||||
ReplicationPeerConfig newConfig =
|
||||
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
|
||||
// RS need to start work with the new replication config change
|
||||
if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
|
||||
replicationSourceManager.refreshSources(peerId);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success && peer != null) {
|
||||
// Reset peer config if refresh source failed
|
||||
peer.setPeerConfig(oldConfig);
|
||||
}
|
||||
peerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue