HBASE-24781 Clean up peer metrics when disabling peer (#4997)

Co-authored-by: Yuta Imazu <yuta.imazu@linecorp.com>
Signed-off-by: Duo Zhang <zhangduo@apache.or
This commit is contained in:
Yuta Imazu 2023-02-28 14:13:49 +09:00 committed by GitHub
parent d1fede72c3
commit ef6a1130d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 5 deletions

View File

@ -272,10 +272,14 @@ public class MetricsSource implements BaseSource {
/** Removes all metrics about this Source. */ /** Removes all metrics about this Source. */
public void clear() { public void clear() {
terminate();
singleSourceSource.clear();
}
public void terminate() {
int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize); globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
singleSourceSource.clear();
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
lastShippedTimeStamps.clear(); lastShippedTimeStamps.clear();
lastHFileRefsQueueSize = 0; lastHFileRefsQueueSize = 0;

View File

@ -706,10 +706,13 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
} }
if (clearMetrics) {
// Can be null in test context. // Can be null in test context.
if (this.metrics != null) { if (this.metrics != null) {
if (clearMetrics) {
this.metrics.clear(); this.metrics.clear();
} else {
this.metrics.terminate();
} }
} }
} }

View File

@ -467,7 +467,8 @@ public class ReplicationSourceManager {
ReplicationSourceInterface toRemove = this.sources.remove(peerId); ReplicationSourceInterface toRemove = this.sources.remove(peerId);
if (toRemove != null) { if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId()); LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage, null, true); // Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
} }
src = createSource(peerId, peer); src = createSource(peerId, peer);
this.sources.put(peerId, src); this.sources.put(peerId, src);

View File

@ -94,6 +94,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public void terminate(String reason, Exception e, boolean clearMetrics) { public void terminate(String reason, Exception e, boolean clearMetrics) {
if (clearMetrics) { if (clearMetrics) {
this.metrics.clear(); this.metrics.clear();
} else {
this.metrics.terminate();
} }
} }

View File

@ -572,6 +572,41 @@ public abstract class TestReplicationSourceManager {
} }
} }
@Test
public void testDisablePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
// Refreshing the peer should decrement the global and single source metrics
manager.refreshSources(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}
private ReplicationSourceInterface mockReplicationSource(String peerId) { private ReplicationSourceInterface mockReplicationSource(String peerId) {
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getPeerId()).thenReturn(peerId); when(source.getPeerId()).thenReturn(peerId);