HBASE-15740 Replication source.shippedKBs metric is undercounting because it is in KB
This commit is contained in:
parent
137d891fbc
commit
d07d316113
|
@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource {
|
||||||
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
|
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
|
||||||
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
|
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */
|
||||||
public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
|
public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
|
||||||
|
public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
|
||||||
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
|
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
|
||||||
|
|
||||||
public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
|
public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
|
||||||
|
@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource {
|
||||||
void incrLogEditsFiltered(long size);
|
void incrLogEditsFiltered(long size);
|
||||||
void incrBatchesShipped(int batches);
|
void incrBatchesShipped(int batches);
|
||||||
void incrOpsShipped(long ops);
|
void incrOpsShipped(long ops);
|
||||||
void incrShippedKBs(long size);
|
void incrShippedBytes(long size);
|
||||||
void incrLogReadInBytes(long size);
|
void incrLogReadInBytes(long size);
|
||||||
void incrLogReadInEdits(long size);
|
void incrLogReadInEdits(long size);
|
||||||
void clear();
|
void clear();
|
||||||
|
|
|
@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
private final MutableFastCounter logEditsFilteredCounter;
|
private final MutableFastCounter logEditsFilteredCounter;
|
||||||
private final MutableFastCounter shippedBatchesCounter;
|
private final MutableFastCounter shippedBatchesCounter;
|
||||||
private final MutableFastCounter shippedOpsCounter;
|
private final MutableFastCounter shippedOpsCounter;
|
||||||
|
private final MutableFastCounter shippedBytesCounter;
|
||||||
|
@Deprecated
|
||||||
private final MutableFastCounter shippedKBsCounter;
|
private final MutableFastCounter shippedKBsCounter;
|
||||||
private final MutableFastCounter logReadInBytesCounter;
|
private final MutableFastCounter logReadInBytesCounter;
|
||||||
private final MutableFastCounter shippedHFilesCounter;
|
private final MutableFastCounter shippedHFilesCounter;
|
||||||
|
@ -48,6 +50,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
|
|
||||||
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
|
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
|
||||||
|
|
||||||
|
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
|
||||||
|
|
||||||
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
|
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
|
||||||
|
|
||||||
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
|
logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
|
||||||
|
@ -88,8 +92,25 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
|
||||||
shippedOpsCounter.incr(ops);
|
shippedOpsCounter.incr(ops);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void incrShippedKBs(long size) {
|
@Override public void incrShippedBytes(long size) {
|
||||||
shippedKBsCounter.incr(size);
|
shippedBytesCounter.incr(size);
|
||||||
|
// obtained value maybe smaller than 1024. We should make sure that KB count
|
||||||
|
// eventually picks up even from multiple smaller updates.
|
||||||
|
incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) {
|
||||||
|
// Following code should be thread-safe.
|
||||||
|
long delta = 0;
|
||||||
|
while(true) {
|
||||||
|
long bytes = bytesCounter.value();
|
||||||
|
delta = (bytes / 1024) - kbsCounter.value();
|
||||||
|
if (delta > 0) {
|
||||||
|
kbsCounter.incr(delta);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void incrLogReadInBytes(long size) {
|
@Override public void incrLogReadInBytes(long size) {
|
||||||
|
|
|
@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
private final String logEditsFilteredKey;
|
private final String logEditsFilteredKey;
|
||||||
private final String shippedBatchesKey;
|
private final String shippedBatchesKey;
|
||||||
private final String shippedOpsKey;
|
private final String shippedOpsKey;
|
||||||
|
@Deprecated
|
||||||
private final String shippedKBsKey;
|
private final String shippedKBsKey;
|
||||||
|
private final String shippedBytesKey;
|
||||||
private final String logReadInBytesKey;
|
private final String logReadInBytesKey;
|
||||||
private final String shippedHFilesKey;
|
private final String shippedHFilesKey;
|
||||||
private final String sizeOfHFileRefsQueueKey;
|
private final String sizeOfHFileRefsQueueKey;
|
||||||
|
@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
private final MutableFastCounter shippedBatchesCounter;
|
private final MutableFastCounter shippedBatchesCounter;
|
||||||
private final MutableFastCounter shippedOpsCounter;
|
private final MutableFastCounter shippedOpsCounter;
|
||||||
private final MutableFastCounter shippedKBsCounter;
|
private final MutableFastCounter shippedKBsCounter;
|
||||||
|
private final MutableFastCounter shippedBytesCounter;
|
||||||
private final MutableFastCounter logReadInBytesCounter;
|
private final MutableFastCounter logReadInBytesCounter;
|
||||||
private final MutableFastCounter shippedHFilesCounter;
|
private final MutableFastCounter shippedHFilesCounter;
|
||||||
private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
|
private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
|
||||||
|
@ -65,6 +68,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
shippedKBsKey = "source." + this.id + ".shippedKBs";
|
shippedKBsKey = "source." + this.id + ".shippedKBs";
|
||||||
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
|
shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
|
||||||
|
|
||||||
|
shippedBytesKey = "source." + this.id + ".shippedBytes";
|
||||||
|
shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
|
||||||
|
|
||||||
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
|
logReadInBytesKey = "source." + this.id + ".logReadInBytes";
|
||||||
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
|
logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
|
||||||
|
|
||||||
|
@ -109,8 +115,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
shippedOpsCounter.incr(ops);
|
shippedOpsCounter.incr(ops);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void incrShippedKBs(long size) {
|
@Override public void incrShippedBytes(long size) {
|
||||||
shippedKBsCounter.incr(size);
|
shippedBytesCounter.incr(size);
|
||||||
|
MetricsReplicationGlobalSourceSource
|
||||||
|
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void incrLogReadInBytes(long size) {
|
@Override public void incrLogReadInBytes(long size) {
|
||||||
|
@ -125,6 +133,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
||||||
rms.removeMetric(shippedBatchesKey);
|
rms.removeMetric(shippedBatchesKey);
|
||||||
rms.removeMetric(shippedOpsKey);
|
rms.removeMetric(shippedOpsKey);
|
||||||
rms.removeMetric(shippedKBsKey);
|
rms.removeMetric(shippedKBsKey);
|
||||||
|
rms.removeMetric(shippedBytesKey);
|
||||||
|
|
||||||
rms.removeMetric(logReadInBytesKey);
|
rms.removeMetric(logReadInBytesKey);
|
||||||
rms.removeMetric(logReadInEditsKey);
|
rms.removeMetric(logReadInEditsKey);
|
||||||
|
|
|
@ -136,15 +136,15 @@ public class MetricsSource {
|
||||||
*
|
*
|
||||||
* @param batchSize the size of the batch that was shipped to sinks.
|
* @param batchSize the size of the batch that was shipped to sinks.
|
||||||
*/
|
*/
|
||||||
public void shipBatch(long batchSize, int sizeInKB) {
|
public void shipBatch(long batchSize, int sizeInBytes) {
|
||||||
singleSourceSource.incrBatchesShipped(1);
|
singleSourceSource.incrBatchesShipped(1);
|
||||||
globalSourceSource.incrBatchesShipped(1);
|
globalSourceSource.incrBatchesShipped(1);
|
||||||
|
|
||||||
singleSourceSource.incrOpsShipped(batchSize);
|
singleSourceSource.incrOpsShipped(batchSize);
|
||||||
globalSourceSource.incrOpsShipped(batchSize);
|
globalSourceSource.incrOpsShipped(batchSize);
|
||||||
|
|
||||||
singleSourceSource.incrShippedKBs(sizeInKB);
|
singleSourceSource.incrShippedBytes(sizeInBytes);
|
||||||
globalSourceSource.incrShippedKBs(sizeInKB);
|
globalSourceSource.incrShippedBytes(sizeInBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -153,8 +153,8 @@ public class MetricsSource {
|
||||||
* @param batchSize the size of the batch that was shipped to sinks.
|
* @param batchSize the size of the batch that was shipped to sinks.
|
||||||
* @param hfiles total number of hfiles shipped to sinks.
|
* @param hfiles total number of hfiles shipped to sinks.
|
||||||
*/
|
*/
|
||||||
public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
|
public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
|
||||||
shipBatch(batchSize, sizeInKB);
|
shipBatch(batchSize, sizeInBytes);
|
||||||
singleSourceSource.incrHFilesShipped(hfiles);
|
singleSourceSource.incrHFilesShipped(hfiles);
|
||||||
globalSourceSource.incrHFilesShipped(hfiles);
|
globalSourceSource.incrHFilesShipped(hfiles);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1059,7 +1059,7 @@ public class ReplicationSource extends Thread
|
||||||
totalReplicatedEdits.addAndGet(entries.size());
|
totalReplicatedEdits.addAndGet(entries.size());
|
||||||
totalReplicatedOperations.addAndGet(currentNbOperations);
|
totalReplicatedOperations.addAndGet(currentNbOperations);
|
||||||
// FIXME check relationship between wal group and overall
|
// FIXME check relationship between wal group and overall
|
||||||
metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
|
metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
|
||||||
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
|
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
|
||||||
walGroupId);
|
walGroupId);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||||
// source: hbase-server/src/test/protobuf/DummyRegionServerEndpoint.proto
|
// source: DummyRegionServerEndpoint.proto
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.coprocessor.protobuf.generated;
|
package org.apache.hadoop.hbase.coprocessor.protobuf.generated;
|
||||||
|
|
||||||
|
@ -1185,16 +1185,15 @@ public final class DummyRegionServerEndpointProtos {
|
||||||
descriptor;
|
descriptor;
|
||||||
static {
|
static {
|
||||||
java.lang.String[] descriptorData = {
|
java.lang.String[] descriptorData = {
|
||||||
"\n>hbase-server/src/test/protobuf/DummyRe" +
|
"\n\037DummyRegionServerEndpoint.proto\022\rhbase" +
|
||||||
"gionServerEndpoint.proto\022\rhbase.test.pb\"" +
|
".test.pb\"\016\n\014DummyRequest\"\036\n\rDummyRespons" +
|
||||||
"\016\n\014DummyRequest\"\036\n\rDummyResponse\022\r\n\005valu" +
|
"e\022\r\n\005value\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdum" +
|
||||||
"e\030\001 \002(\t2\237\001\n\014DummyService\022F\n\tdummyCall\022\033." +
|
"myCall\022\033.hbase.test.pb.DummyRequest\032\034.hb" +
|
||||||
"hbase.test.pb.DummyRequest\032\034.hbase.test." +
|
"ase.test.pb.DummyResponse\022G\n\ndummyThrow\022" +
|
||||||
"pb.DummyResponse\022G\n\ndummyThrow\022\033.hbase.t" +
|
"\033.hbase.test.pb.DummyRequest\032\034.hbase.tes" +
|
||||||
"est.pb.DummyRequest\032\034.hbase.test.pb.Dumm" +
|
"t.pb.DummyResponseB_\n6org.apache.hadoop." +
|
||||||
"yResponseB_\n6org.apache.hadoop.hbase.cop" +
|
"hbase.coprocessor.protobuf.generatedB\037Du" +
|
||||||
"rocessor.protobuf.generatedB\037DummyRegion" +
|
"mmyRegionServerEndpointProtos\210\001\001\240\001\001"
|
||||||
"ServerEndpointProtos\210\001\001\240\001\001"
|
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
|
Loading…
Reference in New Issue