HBASE-25082: Per table WAL metrics: appendCount and appendSize (#2440)

Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org>
Signed-off-by: Ankit Jain <jain.ankit@salesforce.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit 56c7505f8f)
This commit is contained in:
Bharath Vissapragada 2020-09-23 21:06:57 -07:00
parent b2f2c79d8f
commit 505ceacb4b
No known key found for this signature in database
GPG Key ID: 18AE42A0B5A93FA7
4 changed files with 81 additions and 20 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -79,7 +80,7 @@ public interface MetricsWALSource extends BaseSource {
/** /**
* Add the append size. * Add the append size.
*/ */
void incrementAppendSize(long size); void incrementAppendSize(TableName tableName, long size);
/** /**
* Add the time it took to append. * Add the time it took to append.
@ -89,7 +90,7 @@ public interface MetricsWALSource extends BaseSource {
/** /**
* Increment the count of wal appends * Increment the count of wal appends
*/ */
void incrementAppendCount(); void incrementAppendCount(TableName tableName);
/** /**
* Increment the number of appends that were slow * Increment the number of appends that were slow
@ -114,6 +115,4 @@ public interface MetricsWALSource extends BaseSource {
void incrementSizeLogRoll(); void incrementSizeLogRoll();
void incrementWrittenBytes(long val); void incrementWrittenBytes(long val);
long getWrittenBytes();
} }

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableFastCounter;
@ -43,6 +46,9 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
private final MutableFastCounter slowSyncRollRequested; private final MutableFastCounter slowSyncRollRequested;
private final MutableFastCounter sizeRollRequested; private final MutableFastCounter sizeRollRequested;
private final MutableFastCounter writtenBytes; private final MutableFastCounter writtenBytes;
// Per table metrics.
private final ConcurrentMap<TableName, MutableFastCounter> perTableAppendCount;
private final ConcurrentMap<TableName, MutableFastCounter> perTableAppendSize;
public MetricsWALSourceImpl() { public MetricsWALSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
@ -72,11 +78,23 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
sizeRollRequested = this.getMetricsRegistry() sizeRollRequested = this.getMetricsRegistry()
.newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L); .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L);
writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L);
perTableAppendCount = new ConcurrentHashMap<>();
perTableAppendSize = new ConcurrentHashMap<>();
} }
@Override @Override
public void incrementAppendSize(long size) { public void incrementAppendSize(TableName tableName, long size) {
appendSizeHisto.add(size); appendSizeHisto.add(size);
MutableFastCounter tableAppendSizeCounter = perTableAppendSize.get(tableName);
if (tableAppendSizeCounter == null) {
// Ideally putIfAbsent is atomic and we don't need a branch check but we still do it to avoid
// expensive string construction for every append.
String metricsKey = String.format("%s.%s", tableName, APPEND_SIZE);
perTableAppendSize.putIfAbsent(
tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_SIZE_DESC, 0L));
tableAppendSizeCounter = perTableAppendSize.get(tableName);
}
tableAppendSizeCounter.incr(size);
} }
@Override @Override
@ -85,8 +103,16 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
} }
@Override @Override
public void incrementAppendCount() { public void incrementAppendCount(TableName tableName) {
appendCount.incr(); appendCount.incr();
MutableFastCounter tableAppendCounter = perTableAppendCount.get(tableName);
if (tableAppendCounter == null) {
String metricsKey = String.format("%s.%s", tableName, APPEND_COUNT);
perTableAppendCount.putIfAbsent(
tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_COUNT_DESC, 0L));
tableAppendCounter = perTableAppendCount.get(tableName);
}
tableAppendCounter.incr();
} }
@Override @Override
@ -133,10 +159,4 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo
public void incrementWrittenBytes(long val) { public void incrementWrittenBytes(long val) {
writtenBytes.incr(val); writtenBytes.incr(val);
} }
@Override
public long getWrittenBytes() {
return writtenBytes.value();
}
} }

View File

@ -23,6 +23,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -58,9 +59,10 @@ public class MetricsWAL implements WALActionsListener {
@Override @Override
public void postAppend(final long size, final long time, final WALKey logkey, public void postAppend(final long size, final long time, final WALKey logkey,
final WALEdit logEdit) throws IOException { final WALEdit logEdit) throws IOException {
source.incrementAppendCount(); TableName tableName = logkey.getTableName();
source.incrementAppendCount(tableName);
source.incrementAppendTime(time); source.incrementAppendTime(time);
source.incrementAppendSize(size); source.incrementAppendSize(tableName, size);
source.incrementWrittenBytes(size); source.incrementWrittenBytes(size);
if (time > 1000) { if (time > 1000) {

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -22,10 +22,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -71,11 +76,13 @@ public class TestMetricsWAL {
public void testSlowAppend() throws Exception { public void testSlowAppend() throws Exception {
MetricsWALSource source = new MetricsWALSourceImpl(); MetricsWALSource source = new MetricsWALSourceImpl();
MetricsWAL metricsWAL = new MetricsWAL(source); MetricsWAL metricsWAL = new MetricsWAL(source);
TableName tableName = TableName.valueOf("foo");
WALKey walKey = new WALKeyImpl(null, tableName, -1);
// One not so slow append (< 1000) // One not so slow append (< 1000)
metricsWAL.postAppend(1, 900, null, null); metricsWAL.postAppend(1, 900, walKey, null);
// Two slow appends (> 1000) // Two slow appends (> 1000)
metricsWAL.postAppend(1, 1010, null, null); metricsWAL.postAppend(1, 1010, walKey, null);
metricsWAL.postAppend(1, 2000, null, null); metricsWAL.postAppend(1, 2000, walKey, null);
assertEquals(2, source.getSlowAppendCount()); assertEquals(2, source.getSlowAppendCount());
} }
@ -83,10 +90,43 @@ public class TestMetricsWAL {
public void testWalWrittenInBytes() throws Exception { public void testWalWrittenInBytes() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source); MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.postAppend(100, 900, null, null); TableName tableName = TableName.valueOf("foo");
metricsWAL.postAppend(200, 2000, null, null); WALKey walKey = new WALKeyImpl(null, tableName, -1);
metricsWAL.postAppend(100, 900, walKey, null);
metricsWAL.postAppend(200, 2000, walKey, null);
verify(source, times(1)).incrementWrittenBytes(100); verify(source, times(1)).incrementWrittenBytes(100);
verify(source, times(1)).incrementWrittenBytes(200); verify(source, times(1)).incrementWrittenBytes(200);
} }
@Test
public void testPerTableWALMetrics() throws Exception {
MetricsWALSourceImpl source = new MetricsWALSourceImpl("foo", "foo", "foo", "foo");
final int numThreads = 10;
final int numIters = 10;
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
final TableName tableName = TableName.valueOf("tab_" + i);
long size = i;
new Thread(() -> {
for (int j = 0; j < numIters; j++) {
source.incrementAppendCount(tableName);
source.incrementAppendSize(tableName, size);
}
latch.countDown();
}).start();
}
// Wait for threads to finish.
latch.await();
DynamicMetricsRegistry registry = source.getMetricsRegistry();
// Validate the metrics
for (int i = 0; i < numThreads; i++) {
TableName tableName = TableName.valueOf("tab_" + i);
long tableAppendCount =
registry.getCounter(tableName + "." + MetricsWALSource.APPEND_COUNT, -1).value();
assertEquals(numIters, tableAppendCount);
long tableAppendSize =
registry.getCounter(tableName + "." + MetricsWALSource.APPEND_SIZE, -1).value();
assertEquals(i * numIters, tableAppendSize);
}
}
} }