HBASE-15740 Replication source.shippedKBs metric is undercounting because it is in KB
This commit is contained in:
parent
541d1da5fe
commit
b75b226804
|
@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource {
|
|||
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
|
||||
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
|
||||
|
||||
@Deprecated
|
||||
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */
|
||||
public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
|
||||
public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
|
||||
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
|
||||
|
||||
public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
|
||||
|
@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource {
|
|||
void incrLogEditsFiltered(long size);
|
||||
void incrBatchesShipped(int batches);
|
||||
void incrOpsShipped(long ops);
|
||||
void incrShippedKBs(long size);
|
||||
void incrShippedBytes(long size);
|
||||
void incrLogReadInBytes(long size);
|
||||
void incrLogReadInEdits(long size);
|
||||
void clear();
|
||||
|
|
|
@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
|||
private final MutableFastCounter logEditsFilteredCounter;
|
||||
private final MutableFastCounter shippedBatchesCounter;
|
||||
private final MutableFastCounter shippedOpsCounter;
|
||||
private final MutableFastCounter shippedBytesCounter;
|
||||
@Deprecated
|
||||
private final MutableFastCounter shippedKBsCounter;
|
||||
private final MutableFastCounter logReadInBytesCounter;
|
||||
private final MutableFastCounter shippedHFilesCounter;
|
||||
|
@ -48,6 +50,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
|||
|
||||
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
|
||||
|
||||
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
|
||||
|
||||
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
|
||||
|
||||
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
|
||||
|
@ -88,8 +92,25 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
|||
shippedOpsCounter.incr(ops);
|
||||
}
|
||||
|
||||
@Override public void incrShippedKBs(long size) {
|
||||
shippedKBsCounter.incr(size);
|
||||
@Override public void incrShippedBytes(long size) {
|
||||
shippedBytesCounter.incr(size);
|
||||
// obtained value maybe smaller than 1024. We should make sure that KB count
|
||||
// eventually picks up even from multiple smaller updates.
|
||||
incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
|
||||
}
|
||||
|
||||
static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
|
||||
// Following code should be thread-safe.
|
||||
long delta = 0;
|
||||
while(true) {
|
||||
long bytes = bytesCounter.value();
|
||||
delta = (bytes / 1024) - kbsCounter.value();
|
||||
if (delta > 0) {
|
||||
kbsCounter.incr(delta);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void incrLogReadInBytes(long size) {
|
||||
|
|
|
@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final String logEditsFilteredKey;
|
||||
private final String shippedBatchesKey;
|
||||
private final String shippedOpsKey;
|
||||
@Deprecated
|
||||
private final String shippedKBsKey;
|
||||
private final String shippedBytesKey;
|
||||
private final String logReadInBytesKey;
|
||||
private final String shippedHFilesKey;
|
||||
private final String sizeOfHFileRefsQueueKey;
|
||||
|
@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
private final MutableFastCounter shippedBatchesCounter;
|
||||
private final MutableFastCounter shippedOpsCounter;
|
||||
private final MutableFastCounter shippedKBsCounter;
|
||||
private final MutableFastCounter shippedBytesCounter;
|
||||
private final MutableFastCounter logReadInBytesCounter;
|
||||
private final MutableFastCounter shippedHFilesCounter;
|
||||
private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
|
||||
|
@ -65,6 +68,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
shippedKBsKey = "source." + this.id + ".shippedKBs";
|
||||
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
|
||||
|
||||
shippedBytesKey = "source." + this.id + ".shippedBytes";
|
||||
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
|
||||
|
||||
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
|
||||
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
|
||||
|
||||
|
@ -109,8 +115,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
shippedOpsCounter.incr(ops);
|
||||
}
|
||||
|
||||
@Override public void incrShippedKBs(long size) {
|
||||
shippedKBsCounter.incr(size);
|
||||
@Override public void incrShippedBytes(long size) {
|
||||
shippedBytesCounter.incr(size);
|
||||
MetricsReplicationGlobalSourceSource
|
||||
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
|
||||
}
|
||||
|
||||
@Override public void incrLogReadInBytes(long size) {
|
||||
|
@ -125,6 +133,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
rms.removeMetric(shippedBatchesKey);
|
||||
rms.removeMetric(shippedOpsKey);
|
||||
rms.removeMetric(shippedKBsKey);
|
||||
rms.removeMetric(shippedBytesKey);
|
||||
|
||||
rms.removeMetric(logReadInBytesKey);
|
||||
rms.removeMetric(logReadInEditsKey);
|
||||
|
|
|
@ -136,15 +136,15 @@ public class MetricsSource {
|
|||
*
|
||||
* @param batchSize the size of the batch that was shipped to sinks.
|
||||
*/
|
||||
public void shipBatch(long batchSize, int sizeInKB) {
|
||||
public void shipBatch(long batchSize, int sizeInBytes) {
|
||||
singleSourceSource.incrBatchesShipped(1);
|
||||
globalSourceSource.incrBatchesShipped(1);
|
||||
|
||||
singleSourceSource.incrOpsShipped(batchSize);
|
||||
globalSourceSource.incrOpsShipped(batchSize);
|
||||
|
||||
singleSourceSource.incrShippedKBs(sizeInKB);
|
||||
globalSourceSource.incrShippedKBs(sizeInKB);
|
||||
singleSourceSource.incrShippedBytes(sizeInBytes);
|
||||
globalSourceSource.incrShippedBytes(sizeInBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -153,8 +153,8 @@ public class MetricsSource {
|
|||
* @param batchSize the size of the batch that was shipped to sinks.
|
||||
* @param hfiles total number of hfiles shipped to sinks.
|
||||
*/
|
||||
public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
|
||||
shipBatch(batchSize, sizeInKB);
|
||||
public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
|
||||
shipBatch(batchSize, sizeInBytes);
|
||||
singleSourceSource.incrHFilesShipped(hfiles);
|
||||
globalSourceSource.incrHFilesShipped(hfiles);
|
||||
}
|
||||
|
|
|
@ -1060,7 +1060,7 @@ public class ReplicationSource extends Thread
|
|||
totalReplicatedEdits.addAndGet(entries.size());
|
||||
totalReplicatedOperations.addAndGet(currentNbOperations);
|
||||
// FIXME check relationship between wal group and overall
|
||||
metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
|
||||
metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
|
||||
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
|
||||
walGroupId);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
// source: hbase-server/src/test/protobuf/DummyRegionServerEndpoint.proto
|
||||
// source: DummyRegionServerEndpoint.proto
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor.protobuf.generated;
|
||||
|
||||
|
@ -1185,16 +1185,15 @@ public final class DummyRegionServerEndpointProtos {
|
|||
descriptor;
|
||||
static {
|
||||
java.lang.String[] descriptorData = {
|
||||
"\n>hbase-server/src/test/protobuf/DummyRe" +
|
||||
"gionServerEndpoint.proto\022\rhbase.test.pb\"" +
|
||||
"\016\n\014DummyRequest\"\036\n\rDummyResponse\022\r\n\005valu" +
|
||||
"e\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdummyCall\022\033." +
|
||||
"hbase.test.pb.DummyRequest\032\034.hbase.test." +
|
||||
"pb.DummyResponse\022G\n\ndummyThrow\022\033.hbase.t" +
|
||||
"est.pb.DummyRequest\032\034.hbase.test.pb.Dumm" +
|
||||
"yResponseB_\n6org.apache.hadoop.hbase.cop" +
|
||||
"rocessor.protobuf.generatedB\037DummyRegion" +
|
||||
"ServerEndpointProtos\210\001\001\240\001\001"
|
||||
"\n\037DummyRegionServerEndpoint.proto\022\rhbase" +
|
||||
".test.pb\"\016\n\014DummyRequest\"\036\n\rDummyRespons" +
|
||||
"e\022\r\n\005value\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdum" +
|
||||
"myCall\022\033.hbase.test.pb.DummyRequest\032\034.hb" +
|
||||
"ase.test.pb.DummyResponse\022G\n\ndummyThrow\022" +
|
||||
"\033.hbase.test.pb.DummyRequest\032\034.hbase.tes" +
|
||||
"t.pb.DummyResponseB_\n6org.apache.hadoop." +
|
||||
"hbase.coprocessor.protobuf.generatedB\037Du" +
|
||||
"mmyRegionServerEndpointProtos\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
|
Loading…
Reference in New Issue