From cf5527be97d0b170a592eb0b9240f1d617870eb6 Mon Sep 17 00:00:00 2001 From: Ruanhui <32773751+frostruan@users.noreply.github.com> Date: Sun, 27 Nov 2022 23:07:40 +0800 Subject: [PATCH] HBASE-27463 Reset sizeOfLogQueue when refresh replication source (#4863) Co-authored-by: huiruan Signed-off-by: Duo Zhang Reviewed-by: Rushabh Shah (cherry picked from commit bb9f43c6f9079b196e7a9d5a5eb35e721b092052) --- .../ReplicationSourceManager.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index bab9b1474fe..ba48207a088 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -97,15 +97,11 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and * {@link #preLogRoll(Path)}. *
  • No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which - * modify it, {@link #removePeer(String)} , <<<<<<< HEAD + * modify it, {@link #removePeer(String)}, * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and - * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. - * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by ======= - * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and * {@link ReplicationSourceManager#claimQueue(ServerName, String)}. - * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by >>>>>>> - * 51893b9ba3... HBASE-26029 It is not reliable to use nodeDeleted event to track region server's - * death (#3430) {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the + * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by + * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the * {@link ReplicationSourceInterface} firstly, then remove the wals from * {@link #walsByIdRecoveredQueues}. And * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to @@ -368,22 +364,18 @@ public class ReplicationSourceManager { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); - ReplicationSourceInterface src = createSource(peerId, peer); + ReplicationSourceInterface src; // synchronized on latestPaths to avoid missing the new log synchronized (this.latestPaths) { - ReplicationSourceInterface toRemove = this.sources.put(peerId, src); + ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - // Do not clear metrics - toRemove.terminate(terminateMessage, null, false); + toRemove.terminate(terminateMessage, null, true); } - for (SortedSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> { - Path walPath = new Path(this.logDir, wal); - src.enqueueLog(walPath); - LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); - }); - + src = createSource(peerId, peer); + this.sources.put(peerId, src); + for (NavigableSet walsByGroup : walsById.get(peerId).values()) { + walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); } } LOG.info("Startup replication source for " + src.getPeerId());