diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 4c488e1abcf..6d385a4edd8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -400,22 +400,18 @@ public class ReplicationSourceManager implements ReplicationListener { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); - ReplicationSourceInterface src = createSource(peerId, peer); + ReplicationSourceInterface src; // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { - ReplicationSourceInterface toRemove = this.sources.put(peerId, src); + ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - // Do not clear metrics - toRemove.terminate(terminateMessage, null, false); + toRemove.terminate(terminateMessage, null, true); } - for (SortedSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> { - Path walPath = new Path(this.logDir, wal); - src.enqueueLog(walPath); - LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); - }); - + src = createSource(peerId, peer); + this.sources.put(peerId, src); + for (NavigableSet walsByGroup : walsById.get(peerId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } } LOG.info("Startup replication source for " + src.getPeerId());