diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 948c24dc4f8..78edffaf471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -549,7 +549,12 @@ public class ReplicationSource implements ReplicationSourceInterface { terminate(reason, cause, true); } - public void terminate(String reason, Exception cause, boolean join) { + @Override + public void terminate(String reason, Exception cause, boolean clearMetrics) { + terminate(reason, cause, clearMetrics, true); + } + + public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { if (cause == null) { LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); } else { @@ -595,7 +600,9 @@ public class ReplicationSource implements ReplicationSourceInterface { if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } - metrics.clear(); + if (clearMetrics) { + metrics.clear(); + } if (join) { for (ReplicationSourceShipper worker : workers) { Threads.shutdown(worker, this.sleepForRetries); @@ -611,7 +618,9 @@ public class ReplicationSource implements ReplicationSourceInterface { } } } - this.metrics.clear(); + if (clearMetrics) { + this.metrics.clear(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc7b2f..0bd90cf1ee8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -91,6 +91,14 @@ public interface ReplicationSourceInterface { */ void terminate(String reason, Exception cause); + /** + * End the replication + * @param reason why it's terminating + * @param cause the error that's causing it + * @param clearMetrics removes all metrics about this Source + */ + void terminate(String reason, Exception cause, boolean clearMetrics); + /** * Get the current log that's replicated * @return the current log diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index de37cc862e7..43afa798f08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -482,7 +482,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface toRemove = this.sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - toRemove.terminate(terminateMessage); + // Do not clear metrics + toRemove.terminate(terminateMessage, null, false); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 67f793d628c..a361c447060 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -86,7 +86,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void terminate(String reason, Exception e) { - this.metrics.clear(); + terminate(reason, e, true); + } + + @Override + public void terminate(String reason, Exception e, boolean clearMetrics) { + if (clearMetrics) { + this.metrics.clear(); + } } @Override