HBASE-10213 Add read log size per second metrics for replication source

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1554347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-12-31 01:34:37 +00:00
parent 8e251aad70
commit f2f316db1f
2 changed files with 11 additions and 0 deletions

View File

@ -37,6 +37,7 @@ public class MetricsSource {
public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
public static final Log LOG = LogFactory.getLog(MetricsSource.class);
private String id;
@ -50,6 +51,7 @@ public class MetricsSource {
private String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
private final String logReadInBytesKey;
private MetricsReplicationSource rms;
@ -67,6 +69,7 @@ public class MetricsSource {
logEditsFilteredKey = "source." + id + ".logEditsFiltered";
shippedBatchesKey = "source." + this.id + ".shippedBatches";
shippedOpsKey = "source." + this.id + ".shippedOps";
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
}
@ -144,6 +147,12 @@ public class MetricsSource {
rms.incCounters(shippedOpsKey, batchSize);
rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
}
/** increase the byte number read by source from log file */
public void incrLogReadInByes(long readInBytes) {
rms.incCounters(logReadInBytesKey, readInBytes);
rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes);
}
/** Removes all metrics about this Source. */
public void clear() {

View File

@ -377,6 +377,7 @@ public class ReplicationSource extends Thread
+ this.repLogReader.getPosition());
}
this.repLogReader.seek();
long positionBeforeRead = this.repLogReader.getPosition();
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition();
while (entry != null) {
@ -413,6 +414,7 @@ public class ReplicationSource extends Thread
break;
}
}
metrics.incrLogReadInByes(this.repLogReader.getPosition() - positionBeforeRead);
if (currentWALisBeingWrittenTo) {
return false;
}