diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 604e0bbff8e..d6564665721 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -54,11 +54,11 @@ public class ReplicationPeerImpl implements ReplicationPeer { this.peerConfigListeners = new ArrayList<>(); } - void setPeerState(boolean enabled) { + public void setPeerState(boolean enabled) { this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED; } - void setPeerConfig(ReplicationPeerConfig peerConfig) { + public void setPeerConfig(ReplicationPeerConfig peerConfig) { this.peerConfig = peerConfig; peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index ce8fdaec3f0..a02d18163f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -62,18 +62,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { private void refreshPeerState(String peerId) throws ReplicationException, IOException { PeerState newState; Lock peerLock = peersLock.acquireLock(peerId); + ReplicationPeerImpl peer = null; + PeerState oldState = null; + boolean success = false; try { - ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } - PeerState oldState = peer.getPeerState(); + oldState = peer.getPeerState(); newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); // RS need to start work with the new replication state change if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { replicationSourceManager.refreshSources(peerId); } + success = true; } finally { + if (!success && peer != null) { + // Reset peer state if refresh source failed + peer.setPeerState(oldState.equals(PeerState.ENABLED)); + } peerLock.unlock(); } } @@ -91,19 +99,27 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { Lock peerLock = peersLock.acquireLock(peerId); + ReplicationPeerImpl peer = null; + ReplicationPeerConfig oldConfig = null; + boolean success = false; try { - ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); + peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); if (peer == null) { throw new ReplicationException("Peer with id=" + peerId + " is not cached."); } - ReplicationPeerConfig oldConfig = peer.getPeerConfig(); + oldConfig = peer.getPeerConfig(); ReplicationPeerConfig newConfig = replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); // RS need to start work with the new replication config change if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) { replicationSourceManager.refreshSources(peerId); } + success = true; } finally { + if (!success && peer != null) { + // Reset peer config if refresh source failed + peer.setPeerConfig(oldConfig); + } peerLock.unlock(); } }