diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ad18d7609b9..e006259eb15 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -428,6 +428,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String MAJOR_COMPACTED_CELLS_SIZE_DESC = "The total amount of data processed during major compactions, in bytes"; + String TOTAL_BYTES_READ = "totalBytesRead"; + String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS"; + String LOCAL_BYTES_READ = "localBytesRead"; + String LOCAL_BYTES_READ_DESC = + "The number of bytes read from the local HDFS DataNode"; + String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead"; + String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read"; + String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead"; + String ZEROCOPY_BYTES_READ_DESC = + "The number of bytes read through HDFS zero copy"; + String BLOCKED_REQUESTS_COUNT = "blockedRequestCount"; String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is " + "larger than blockingMemStoreSize"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 5cbdc648f49..ec84acc3fa3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -348,6 +348,26 @@ public interface MetricsRegionServerWrapper { */ long getMajorCompactedCellsSize(); + /** + * @return Number of total bytes read from HDFS. + */ + long getTotalBytesRead(); + + /** + * @return Number of bytes read from the local HDFS DataNode. + */ + long getLocalBytesRead(); + + /** + * @return Number of bytes read locally through HDFS short circuit. + */ + long getShortCircuitBytesRead(); + + /** + * @return Number of bytes read locally through HDFS zero copy. + */ + long getZeroCopyBytesRead(); + /** * @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize */ diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 88330267051..31b5a9799af 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -392,6 +392,18 @@ public class MetricsRegionServerSourceImpl .addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS, PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC), rsWrap.getPercentFileLocalSecondaryRegions()) + .addGauge(Interns.info(TOTAL_BYTES_READ, + TOTAL_BYTES_READ_DESC), + rsWrap.getTotalBytesRead()) + .addGauge(Interns.info(LOCAL_BYTES_READ, + LOCAL_BYTES_READ_DESC), + rsWrap.getLocalBytesRead()) + .addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ, + SHORTCIRCUIT_BYTES_READ_DESC), + rsWrap.getShortCircuitBytesRead()) + .addGauge(Interns.info(ZEROCOPY_BYTES_READ, + ZEROCOPY_BYTES_READ_DESC), + rsWrap.getZeroCopyBytesRead()) .addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC), rsWrap.getSplitQueueSize()) .addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index d7867d11e81..f14da01be12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import com.google.common.annotations.VisibleForTesting; @@ -88,6 +90,15 @@ public class FSDataInputStreamWrapper { // errors against Hadoop pre 2.6.4 and 2.7.1 versions. private Method unbuffer = null; + private final static ReadStatistics readStatistics = new ReadStatistics(); + + private static class ReadStatistics { + long totalBytesRead; + long totalLocalBytesRead; + long totalShortCircuitBytesRead; + long totalZeroCopyBytesRead; + } + public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { this(fs, null, path, false); } @@ -214,13 +225,61 @@ public class FSDataInputStreamWrapper { } } + private void updateInputStreamStatistics(FSDataInputStream stream) { + // If the underlying file system is HDFS, update read statistics upon close. + if (stream instanceof HdfsDataInputStream) { + /** + * Because HDFS ReadStatistics is calculated per input stream, it is not + * feasible to update the aggregated number in real time. Instead, the + * metrics are updated when an input stream is closed. + */ + HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream; + synchronized (readStatistics) { + readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalBytesRead(); + readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalBytesRead(); + readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalShortCircuitBytesRead(); + readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics(). + getTotalZeroCopyBytesRead(); + } + } + } + + public static long getTotalBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalBytesRead; + } + } + + public static long getLocalBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalLocalBytesRead; + } + } + + public static long getShortCircuitBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalShortCircuitBytesRead; + } + } + + public static long getZeroCopyBytesRead() { + synchronized (readStatistics) { + return readStatistics.totalZeroCopyBytesRead; + } + } + /** Close stream(s) if necessary. */ - public void close() throws IOException { + public void close() { if (!doCloseStreams) { return; } + updateInputStreamStatistics(this.streamNoFsChecksum); // we do not care about the close exception as it is for reading, no data loss issue. IOUtils.closeQuietly(streamNoFsChecksum); + updateInputStreamStatistics(stream); IOUtils.closeQuietly(stream); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 00789934169..0d20c7deabf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; @@ -798,6 +799,26 @@ class MetricsRegionServerWrapperImpl } } + @Override + public long getTotalBytesRead() { + return FSDataInputStreamWrapper.getTotalBytesRead(); + } + + @Override + public long getLocalBytesRead() { + return FSDataInputStreamWrapper.getLocalBytesRead(); + } + + @Override + public long getShortCircuitBytesRead() { + return FSDataInputStreamWrapper.getShortCircuitBytesRead(); + } + + @Override + public long getZeroCopyBytesRead() { + return FSDataInputStreamWrapper.getZeroCopyBytesRead(); + } + @Override public long getBlockedRequestsCount() { return blockedRequestsCount; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 3be2604f077..dbf475893aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -360,6 +360,26 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 10240000; } + @Override + public long getTotalBytesRead() { + return 0; + } + + @Override + public long getLocalBytesRead() { + return 0; + } + + @Override + public long getShortCircuitBytesRead() { + return 0; + } + + @Override + public long getZeroCopyBytesRead() { + return 0; + } + @Override public long getBlockedRequestsCount() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 3c78a2f1c14..1a836efd26a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; @@ -32,13 +41,6 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - @Category(MediumTests.class) public class TestRegionServerMetrics { private static MetricsAssertHelper metricsHelper; @@ -682,4 +684,34 @@ public class TestRegionServerMetrics { t.close(); } + + @Test + public void testReadBytes() throws Exception { + TableName tableName = TableName.valueOf("testReadBytes"); + byte[] cf = Bytes.toBytes("d"); + byte[] row = Bytes.toBytes("rk"); + byte[] qualifier = Bytes.toBytes("qual"); + byte[] val = Bytes.toBytes("Value"); + + Table t = TEST_UTIL.createTable(tableName, cf); + // Do a first put to be sure that the connection is established, meta is there and so on. + Put p = new Put(row); + p.addColumn(cf, qualifier, val); + t.put(p); + // Do a few gets + for (int i = 0; i < 10; i++) { + t.get(new Get(row)); + } + TEST_UTIL.getHBaseAdmin().flush(tableName); + metricsRegionServer.getRegionServerWrapper().forceRecompute(); + + assertTrue("Total read bytes should be larger than 0", + metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0); + assertTrue("Total local read bytes should be larger than 0", + metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0); + assertEquals("Total short circuit read bytes should be equal to 0", 0, + metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead()); + assertEquals("Total zero-byte read bytes should be equal to 0", 0, + metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead()); + } }