diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index 7dc27f60973..b179b91d8a3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.yetus.audience.InterfaceAudience; @@ -79,7 +80,7 @@ public interface MetricsWALSource extends BaseSource { /** * Add the append size. */ - void incrementAppendSize(long size); + void incrementAppendSize(TableName tableName, long size); /** * Add the time it took to append. @@ -89,7 +90,7 @@ public interface MetricsWALSource extends BaseSource { /** * Increment the count of wal appends */ - void incrementAppendCount(); + void incrementAppendCount(TableName tableName); /** * Increment the number of appends that were slow @@ -114,6 +115,4 @@ public interface MetricsWALSource extends BaseSource { void incrementSizeLogRoll(); void incrementWrittenBytes(long val); - - long getWrittenBytes(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index eb605c50362..d308913f6e4 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.regionserver.wal; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.lib.MutableFastCounter; @@ -43,6 +46,9 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo private final MutableFastCounter slowSyncRollRequested; private final MutableFastCounter sizeRollRequested; private final MutableFastCounter writtenBytes; + // Per table metrics. + private final ConcurrentMap perTableAppendCount; + private final ConcurrentMap perTableAppendSize; public MetricsWALSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); @@ -72,11 +78,23 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo sizeRollRequested = this.getMetricsRegistry() .newCounter(SIZE_ROLL_REQUESTED, SIZE_ROLL_REQUESTED_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L); + perTableAppendCount = new ConcurrentHashMap<>(); + perTableAppendSize = new ConcurrentHashMap<>(); } @Override - public void incrementAppendSize(long size) { + public void incrementAppendSize(TableName tableName, long size) { appendSizeHisto.add(size); + MutableFastCounter tableAppendSizeCounter = perTableAppendSize.get(tableName); + if (tableAppendSizeCounter == null) { + // Ideally putIfAbsent is atomic and we don't need a branch check but we still do it to avoid + // expensive string construction for every append. + String metricsKey = String.format("%s.%s", tableName, APPEND_SIZE); + perTableAppendSize.putIfAbsent( + tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_SIZE_DESC, 0L)); + tableAppendSizeCounter = perTableAppendSize.get(tableName); + } + tableAppendSizeCounter.incr(size); } @Override @@ -85,8 +103,16 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo } @Override - public void incrementAppendCount() { + public void incrementAppendCount(TableName tableName) { appendCount.incr(); + MutableFastCounter tableAppendCounter = perTableAppendCount.get(tableName); + if (tableAppendCounter == null) { + String metricsKey = String.format("%s.%s", tableName, APPEND_COUNT); + perTableAppendCount.putIfAbsent( + tableName, getMetricsRegistry().newCounter(metricsKey, APPEND_COUNT_DESC, 0L)); + tableAppendCounter = perTableAppendCount.get(tableName); + } + tableAppendCounter.incr(); } @Override @@ -133,10 +159,4 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo public void incrementWrittenBytes(long val) { writtenBytes.incr(val); } - - @Override - public long getWrittenBytes() { - return writtenBytes.value(); - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index b2af4a80ad3..ee99aa30f98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -23,6 +23,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import java.io.IOException; +import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,9 +59,10 @@ public class MetricsWAL implements WALActionsListener { @Override public void postAppend(final long size, final long time, final WALKey logkey, final WALEdit logEdit) throws IOException { - source.incrementAppendCount(); + TableName tableName = logkey.getTableName(); + source.incrementAppendCount(tableName); source.incrementAppendTime(time); - source.incrementAppendSize(size); + source.incrementAppendSize(tableName, size); source.incrementWrittenBytes(size); if (time > 1000) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index 1c324dd8856..14d0a88a3a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,10 +22,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -71,11 +76,13 @@ public class TestMetricsWAL { public void testSlowAppend() throws Exception { MetricsWALSource source = new MetricsWALSourceImpl(); MetricsWAL metricsWAL = new MetricsWAL(source); + TableName tableName = TableName.valueOf("foo"); + WALKey walKey = new WALKeyImpl(null, tableName, -1); // One not so slow append (< 1000) - metricsWAL.postAppend(1, 900, null, null); + metricsWAL.postAppend(1, 900, walKey, null); // Two slow appends (> 1000) - metricsWAL.postAppend(1, 1010, null, null); - metricsWAL.postAppend(1, 2000, null, null); + metricsWAL.postAppend(1, 1010, walKey, null); + metricsWAL.postAppend(1, 2000, walKey, null); assertEquals(2, source.getSlowAppendCount()); } @@ -83,10 +90,43 @@ public class TestMetricsWAL { public void testWalWrittenInBytes() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.postAppend(100, 900, null, null); - metricsWAL.postAppend(200, 2000, null, null); + TableName tableName = TableName.valueOf("foo"); + WALKey walKey = new WALKeyImpl(null, tableName, -1); + metricsWAL.postAppend(100, 900, walKey, null); + metricsWAL.postAppend(200, 2000, walKey, null); verify(source, times(1)).incrementWrittenBytes(100); verify(source, times(1)).incrementWrittenBytes(200); } + @Test + public void testPerTableWALMetrics() throws Exception { + MetricsWALSourceImpl source = new MetricsWALSourceImpl("foo", "foo", "foo", "foo"); + final int numThreads = 10; + final int numIters = 10; + CountDownLatch latch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + final TableName tableName = TableName.valueOf("tab_" + i); + long size = i; + new Thread(() -> { + for (int j = 0; j < numIters; j++) { + source.incrementAppendCount(tableName); + source.incrementAppendSize(tableName, size); + } + latch.countDown(); + }).start(); + } + // Wait for threads to finish. + latch.await(); + DynamicMetricsRegistry registry = source.getMetricsRegistry(); + // Validate the metrics + for (int i = 0; i < numThreads; i++) { + TableName tableName = TableName.valueOf("tab_" + i); + long tableAppendCount = + registry.getCounter(tableName + "." + MetricsWALSource.APPEND_COUNT, -1).value(); + assertEquals(numIters, tableAppendCount); + long tableAppendSize = + registry.getCounter(tableName + "." + MetricsWALSource.APPEND_SIZE, -1).value(); + assertEquals(i * numIters, tableAppendSize); + } + } }