HBASE-23231 ReplicationSource do not update metrics after refresh (#778)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
binlijin 2019-10-31 09:23:59 +08:00 committed by Duo Zhang
parent 0108e57309
commit 3152d9981b
4 changed files with 30 additions and 5 deletions

View File

@ -549,7 +549,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
terminate(reason, cause, true); terminate(reason, cause, true);
} }
public void terminate(String reason, Exception cause, boolean join) { @Override
public void terminate(String reason, Exception cause, boolean clearMetrics) {
terminate(reason, cause, clearMetrics, true);
}
public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
if (cause == null) { if (cause == null) {
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else { } else {
@ -595,7 +600,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (this.replicationEndpoint != null) { if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop(); this.replicationEndpoint.stop();
} }
if (clearMetrics) {
metrics.clear(); metrics.clear();
}
if (join) { if (join) {
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries); Threads.shutdown(worker, this.sleepForRetries);
@ -611,8 +618,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
} }
if (clearMetrics) {
this.metrics.clear(); this.metrics.clear();
} }
}
@Override @Override
public String getQueueId() { public String getQueueId() {

View File

@ -91,6 +91,14 @@ public interface ReplicationSourceInterface {
*/ */
void terminate(String reason, Exception cause); void terminate(String reason, Exception cause);
/**
* End the replication
* @param reason why it's terminating
* @param cause the error that's causing it
* @param clearMetrics removes all metrics about this Source
*/
void terminate(String reason, Exception cause, boolean clearMetrics);
/** /**
* Get the current log that's replicated * Get the current log that's replicated
* @return the current log * @return the current log

View File

@ -482,7 +482,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface toRemove = this.sources.put(peerId, src); ReplicationSourceInterface toRemove = this.sources.put(peerId, src);
if (toRemove != null) { if (toRemove != null) {
LOG.info("Terminate replication source for " + toRemove.getPeerId()); LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage); // Do not clear metrics
toRemove.terminate(terminateMessage, null, false);
} }
for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) { for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));

View File

@ -86,8 +86,15 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override @Override
public void terminate(String reason, Exception e) { public void terminate(String reason, Exception e) {
terminate(reason, e, true);
}
@Override
public void terminate(String reason, Exception e, boolean clearMetrics) {
if (clearMetrics) {
this.metrics.clear(); this.metrics.clear();
} }
}
@Override @Override
public String getQueueId() { public String getQueueId() {