HBASE-24781 Clean up peer metrics when disabling peer (#4997)
Co-authored-by: Yuta Imazu <yuta.imazu@linecorp.com>
Signed-off-by: Duo Zhang <zhangduo@apache.or
(cherry picked from commit ef6a1130d0
)
This commit is contained in:
parent
5b6d9de54c
commit
840c62e532
|
@ -272,10 +272,14 @@ public class MetricsSource implements BaseSource {
|
|||
|
||||
/** Removes all metrics about this Source. */
|
||||
public void clear() {
|
||||
terminate();
|
||||
singleSourceSource.clear();
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
int lastQueueSize = singleSourceSource.getSizeOfLogQueue();
|
||||
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
|
||||
singleSourceSource.decrSizeOfLogQueue(lastQueueSize);
|
||||
singleSourceSource.clear();
|
||||
globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
|
||||
lastShippedTimeStamps.clear();
|
||||
lastHFileRefsQueueSize = 0;
|
||||
|
|
|
@ -719,10 +719,13 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (clearMetrics) {
|
||||
// Can be null in test context.
|
||||
if (this.metrics != null) {
|
||||
|
||||
// Can be null in test context.
|
||||
if (this.metrics != null) {
|
||||
if (clearMetrics) {
|
||||
this.metrics.clear();
|
||||
} else {
|
||||
this.metrics.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -370,7 +370,8 @@ public class ReplicationSourceManager {
|
|||
ReplicationSourceInterface toRemove = this.sources.remove(peerId);
|
||||
if (toRemove != null) {
|
||||
LOG.info("Terminate replication source for " + toRemove.getPeerId());
|
||||
toRemove.terminate(terminateMessage, null, true);
|
||||
// Do not clear metrics
|
||||
toRemove.terminate(terminateMessage, null, false);
|
||||
}
|
||||
src = createSource(peerId, peer);
|
||||
this.sources.put(peerId, src);
|
||||
|
|
|
@ -92,6 +92,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
|||
public void terminate(String reason, Exception e, boolean clearMetrics) {
|
||||
if (clearMetrics) {
|
||||
this.metrics.clear();
|
||||
} else {
|
||||
this.metrics.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -554,6 +554,41 @@ public abstract class TestReplicationSourceManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisablePeerMetricsCleanup() throws Exception {
|
||||
final String peerId = "DummyPeer";
|
||||
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
|
||||
try {
|
||||
MetricsReplicationSourceSource globalSource = getGlobalSource();
|
||||
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
|
||||
final long sizeOfLatestPath = getSizeOfLatestPath();
|
||||
addPeerAndWait(peerId, peerConfig, true);
|
||||
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
|
||||
ReplicationSourceInterface source = manager.getSource(peerId);
|
||||
// Sanity check
|
||||
assertNotNull(source);
|
||||
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
|
||||
// Enqueue log and check if metrics updated
|
||||
source.enqueueLog(new Path("abc"));
|
||||
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
|
||||
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
|
||||
globalSource.getSizeOfLogQueue());
|
||||
|
||||
// Refreshing the peer should decrement the global and single source metrics
|
||||
manager.refreshSources(peerId);
|
||||
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
|
||||
|
||||
source = manager.getSource(peerId);
|
||||
assertNotNull(source);
|
||||
assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
|
||||
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
|
||||
globalSource.getSizeOfLogQueue());
|
||||
} finally {
|
||||
removePeerAndWait(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a peer and wait for it to initialize
|
||||
* @param waitForSource Whether to wait for replication source to initialize
|
||||
|
|
Loading…
Reference in New Issue