HBASE-25082: Per table WAL metrics: appendCount and appendSize (#2440)
Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Ankit Jain <jain.ankit@salesforce.com> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
8bfa2cb2ee
commit
56c7505f8f
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.metrics.BaseSource;
|
import org.apache.hadoop.hbase.metrics.BaseSource;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@ -79,7 +80,7 @@ public interface MetricsWALSource extends BaseSource {
|
||||||
/**
|
/**
|
||||||
* Add the append size.
|
* Add the append size.
|
||||||
*/
|
*/
|
||||||
void incrementAppendSize(long size);
|
void incrementAppendSize(TableName tableName, long size);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add the time it took to append.
|
* Add the time it took to append.
|
||||||
|
@ -89,7 +90,7 @@ public interface MetricsWALSource extends BaseSource {
|
||||||
/**
|
/**
|
||||||
* Increment the count of wal appends
|
* Increment the count of wal appends
|
||||||
*/
|
*/
|
||||||
void incrementAppendCount();
|
void incrementAppendCount(TableName tableName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment the number of appends that were slow
|
* Increment the number of appends that were slow
|
||||||
|
@ -114,6 +115,4 @@ public interface MetricsWALSource extends BaseSource {
|
||||||
void incrementSizeLogRoll();
|
void incrementSizeLogRoll();
|
||||||
|
|
||||||
void incrementWrittenBytes(long val);
|
void incrementWrittenBytes(long val);
|
||||||
|
|
||||||
long getWrittenBytes();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
|
||||||
import org.apache.hadoop.metrics2.MetricHistogram;
|
import org.apache.hadoop.metrics2.MetricHistogram;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
|
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
|
||||||
|
@ -43,6 +46,9 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
||||||
private final MutableFastCounter slowSyncRollRequested;
|
private final MutableFastCounter slowSyncRollRequested;
|
||||||
private final MutableFastCounter sizeRollRequested;
|
private final MutableFastCounter sizeRollRequested;
|
||||||
private final MutableFastCounter writtenBytes;
|
private final MutableFastCounter writtenBytes;
|
||||||
|
// Per table metrics.
|
||||||
|
private final ConcurrentMap<TableName, MutableFastCounter> perTableAppendCount;
|
||||||
|
private final ConcurrentMap<TableName, MutableFastCounter> perTableAppendSize;
|
||||||
|
|
||||||
public MetricsWALSourceImpl() {
|
public MetricsWALSourceImpl() {
|
||||||
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
|
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
|
||||||
|
@ -72,11 +78,23 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
||||||
sizeRollRequested = this.getMetricsRegistry()
|
sizeRollRequested = this.getMetricsRegistry()
|
||||||
.newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
|
.newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
|
||||||
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L);
|
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L);
|
||||||
|
perTableAppendCount = new ConcurrentHashMap<>();
|
||||||
|
perTableAppendSize = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void incrementAppendSize(long size) {
|
public void incrementAppendSize(TableName tableName, long size) {
|
||||||
appendSizeHisto.add(size);
|
appendSizeHisto.add(size);
|
||||||
|
MutableFastCounter tableAppendSizeCounter = perTableAppendSize.get(tableName);
|
||||||
|
if (tableAppendSizeCounter == null) {
|
||||||
|
// Ideally putIfAbsent is atomic and we don't need a branch check but we still do it to avoid
|
||||||
|
// expensive string construction for every append.
|
||||||
|
String metricsKey = String.format("%s.%s", tableName, APPEND_SIZE);
|
||||||
|
perTableAppendSize.putIfAbsent(
|
||||||
|
tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_SIZE_DESC, 0L));
|
||||||
|
tableAppendSizeCounter = perTableAppendSize.get(tableName);
|
||||||
|
}
|
||||||
|
tableAppendSizeCounter.incr(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -85,8 +103,16 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void incrementAppendCount() {
|
public void incrementAppendCount(TableName tableName) {
|
||||||
appendCount.incr();
|
appendCount.incr();
|
||||||
|
MutableFastCounter tableAppendCounter = perTableAppendCount.get(tableName);
|
||||||
|
if (tableAppendCounter == null) {
|
||||||
|
String metricsKey = String.format("%s.%s", tableName, APPEND_COUNT);
|
||||||
|
perTableAppendCount.putIfAbsent(
|
||||||
|
tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_COUNT_DESC, 0L));
|
||||||
|
tableAppendCounter = perTableAppendCount.get(tableName);
|
||||||
|
}
|
||||||
|
tableAppendCounter.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -133,10 +159,4 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
|
||||||
public void incrementWrittenBytes(long val) {
|
public void incrementWrittenBytes(long val) {
|
||||||
writtenBytes.incr(val);
|
writtenBytes.incr(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getWrittenBytes() {
|
|
||||||
return writtenBytes.value();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -58,9 +59,10 @@ public class MetricsWAL implements WALActionsListener {
|
||||||
@Override
|
@Override
|
||||||
public void postAppend(final long size, final long time, final WALKey logkey,
|
public void postAppend(final long size, final long time, final WALKey logkey,
|
||||||
final WALEdit logEdit) throws IOException {
|
final WALEdit logEdit) throws IOException {
|
||||||
source.incrementAppendCount();
|
TableName tableName = logkey.getTableName();
|
||||||
|
source.incrementAppendCount(tableName);
|
||||||
source.incrementAppendTime(time);
|
source.incrementAppendTime(time);
|
||||||
source.incrementAppendSize(size);
|
source.incrementAppendSize(tableName, size);
|
||||||
source.incrementWrittenBytes(size);
|
source.incrementWrittenBytes(size);
|
||||||
|
|
||||||
if (time > 1000) {
|
if (time > 1000) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -22,10 +22,15 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -71,11 +76,13 @@ public class TestMetricsWAL {
|
||||||
public void testSlowAppend() throws Exception {
|
public void testSlowAppend() throws Exception {
|
||||||
MetricsWALSource source = new MetricsWALSourceImpl();
|
MetricsWALSource source = new MetricsWALSourceImpl();
|
||||||
MetricsWAL metricsWAL = new MetricsWAL(source);
|
MetricsWAL metricsWAL = new MetricsWAL(source);
|
||||||
|
TableName tableName = TableName.valueOf("foo");
|
||||||
|
WALKey walKey = new WALKeyImpl(null, tableName, -1);
|
||||||
// One not so slow append (< 1000)
|
// One not so slow append (< 1000)
|
||||||
metricsWAL.postAppend(1, 900, null, null);
|
metricsWAL.postAppend(1, 900, walKey, null);
|
||||||
// Two slow appends (> 1000)
|
// Two slow appends (> 1000)
|
||||||
metricsWAL.postAppend(1, 1010, null, null);
|
metricsWAL.postAppend(1, 1010, walKey, null);
|
||||||
metricsWAL.postAppend(1, 2000, null, null);
|
metricsWAL.postAppend(1, 2000, walKey, null);
|
||||||
assertEquals(2, source.getSlowAppendCount());
|
assertEquals(2, source.getSlowAppendCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,10 +90,43 @@ public class TestMetricsWAL {
|
||||||
public void testWalWrittenInBytes() throws Exception {
|
public void testWalWrittenInBytes() throws Exception {
|
||||||
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
|
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
|
||||||
MetricsWAL metricsWAL = new MetricsWAL(source);
|
MetricsWAL metricsWAL = new MetricsWAL(source);
|
||||||
metricsWAL.postAppend(100, 900, null, null);
|
TableName tableName = TableName.valueOf("foo");
|
||||||
metricsWAL.postAppend(200, 2000, null, null);
|
WALKey walKey = new WALKeyImpl(null, tableName, -1);
|
||||||
|
metricsWAL.postAppend(100, 900, walKey, null);
|
||||||
|
metricsWAL.postAppend(200, 2000, walKey, null);
|
||||||
verify(source, times(1)).incrementWrittenBytes(100);
|
verify(source, times(1)).incrementWrittenBytes(100);
|
||||||
verify(source, times(1)).incrementWrittenBytes(200);
|
verify(source, times(1)).incrementWrittenBytes(200);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPerTableWALMetrics() throws Exception {
|
||||||
|
MetricsWALSourceImpl source = new MetricsWALSourceImpl("foo", "foo", "foo", "foo");
|
||||||
|
final int numThreads = 10;
|
||||||
|
final int numIters = 10;
|
||||||
|
CountDownLatch latch = new CountDownLatch(numThreads);
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
final TableName tableName = TableName.valueOf("tab_" + i);
|
||||||
|
long size = i;
|
||||||
|
new Thread(() -> {
|
||||||
|
for (int j = 0; j < numIters; j++) {
|
||||||
|
source.incrementAppendCount(tableName);
|
||||||
|
source.incrementAppendSize(tableName, size);
|
||||||
|
}
|
||||||
|
latch.countDown();
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
// Wait for threads to finish.
|
||||||
|
latch.await();
|
||||||
|
DynamicMetricsRegistry registry = source.getMetricsRegistry();
|
||||||
|
// Validate the metrics
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
TableName tableName = TableName.valueOf("tab_" + i);
|
||||||
|
long tableAppendCount =
|
||||||
|
registry.getCounter(tableName + "." + MetricsWALSource.APPEND_COUNT, -1).value();
|
||||||
|
assertEquals(numIters, tableAppendCount);
|
||||||
|
long tableAppendSize =
|
||||||
|
registry.getCounter(tableName + "." + MetricsWALSource.APPEND_SIZE, -1).value();
|
||||||
|
assertEquals(i * numIters, tableAppendSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue