HBASE-8868 add metric to report client shortcircuit reads.

branch-1 backport of #1334

Signed-off-by: stack <stack@apache.net>
Signed-off-by: Sean Busbey <busbey@apache.org>

Co-authored-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Wei-Chiu Chuang 2020-04-08 09:49:39 -07:00 committed by Andrew Purtell
parent 73d1c2a10f
commit b644e3ccd8
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
7 changed files with 183 additions and 8 deletions

View File

@ -428,6 +428,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during major compactions, in bytes";
String TOTAL_BYTES_READ = "totalBytesRead";
String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS";
String LOCAL_BYTES_READ = "localBytesRead";
String LOCAL_BYTES_READ_DESC =
"The number of bytes read from the local HDFS DataNode";
String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead";
String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read";
String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead";
String ZEROCOPY_BYTES_READ_DESC =
"The number of bytes read through HDFS zero copy";
String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
+ "larger than blockingMemStoreSize";

View File

@ -348,6 +348,26 @@ public interface MetricsRegionServerWrapper {
*/
long getMajorCompactedCellsSize();
/**
* @return Number of total bytes read from HDFS.
*/
long getTotalBytesRead();
/**
* @return Number of bytes read from the local HDFS DataNode.
*/
long getLocalBytesRead();
/**
* @return Number of bytes read locally through HDFS short circuit.
*/
long getShortCircuitBytesRead();
/**
* @return Number of bytes read locally through HDFS zero copy.
*/
long getZeroCopyBytesRead();
/**
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
*/

View File

@ -392,6 +392,18 @@ public class MetricsRegionServerSourceImpl
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
rsWrap.getPercentFileLocalSecondaryRegions())
.addGauge(Interns.info(TOTAL_BYTES_READ,
TOTAL_BYTES_READ_DESC),
rsWrap.getTotalBytesRead())
.addGauge(Interns.info(LOCAL_BYTES_READ,
LOCAL_BYTES_READ_DESC),
rsWrap.getLocalBytesRead())
.addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ,
SHORTCIRCUIT_BYTES_READ_DESC),
rsWrap.getShortCircuitBytesRead())
.addGauge(Interns.info(ZEROCOPY_BYTES_READ,
ZEROCOPY_BYTES_READ_DESC),
rsWrap.getZeroCopyBytesRead())
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
rsWrap.getSplitQueueSize())
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import com.google.common.annotations.VisibleForTesting;
@ -88,6 +90,15 @@ public class FSDataInputStreamWrapper {
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
private Method unbuffer = null;
private final static ReadStatistics readStatistics = new ReadStatistics();
private static class ReadStatistics {
long totalBytesRead;
long totalLocalBytesRead;
long totalShortCircuitBytesRead;
long totalZeroCopyBytesRead;
}
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, null, path, false);
}
@ -214,13 +225,61 @@ public class FSDataInputStreamWrapper {
}
}
private void updateInputStreamStatistics(FSDataInputStream stream) {
// If the underlying file system is HDFS, update read statistics upon close.
if (stream instanceof HdfsDataInputStream) {
/**
* Because HDFS ReadStatistics is calculated per input stream, it is not
* feasible to update the aggregated number in real time. Instead, the
* metrics are updated when an input stream is closed.
*/
HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
synchronized (readStatistics) {
readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalShortCircuitBytesRead();
readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalZeroCopyBytesRead();
}
}
}
public static long getTotalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalBytesRead;
}
}
public static long getLocalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalLocalBytesRead;
}
}
public static long getShortCircuitBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalShortCircuitBytesRead;
}
}
public static long getZeroCopyBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalZeroCopyBytesRead;
}
}
/** Close stream(s) if necessary. */
public void close() throws IOException {
public void close() {
if (!doCloseStreams) {
return;
}
updateInputStreamStatistics(this.streamNoFsChecksum);
// we do not care about the close exception as it is for reading, no data loss issue.
IOUtils.closeQuietly(streamNoFsChecksum);
updateInputStreamStatistics(stream);
IOUtils.closeQuietly(stream);
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
@ -798,6 +799,26 @@ class MetricsRegionServerWrapperImpl
}
}
@Override
public long getTotalBytesRead() {
return FSDataInputStreamWrapper.getTotalBytesRead();
}
@Override
public long getLocalBytesRead() {
return FSDataInputStreamWrapper.getLocalBytesRead();
}
@Override
public long getShortCircuitBytesRead() {
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
}
@Override
public long getZeroCopyBytesRead() {
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
}
@Override
public long getBlockedRequestsCount() {
return blockedRequestsCount;

View File

@ -360,6 +360,26 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
return 10240000;
}
@Override
public long getTotalBytesRead() {
return 0;
}
@Override
public long getLocalBytesRead() {
return 0;
}
@Override
public long getShortCircuitBytesRead() {
return 0;
}
@Override
public long getZeroCopyBytesRead() {
return 0;
}
@Override
public long getBlockedRequestsCount() {
return 0;

View File

@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
@ -32,13 +41,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Category(MediumTests.class)
public class TestRegionServerMetrics {
private static MetricsAssertHelper metricsHelper;
@ -682,4 +684,34 @@ public class TestRegionServerMetrics {
t.close();
}
@Test
public void testReadBytes() throws Exception {
TableName tableName = TableName.valueOf("testReadBytes");
byte[] cf = Bytes.toBytes("d");
byte[] row = Bytes.toBytes("rk");
byte[] qualifier = Bytes.toBytes("qual");
byte[] val = Bytes.toBytes("Value");
Table t = TEST_UTIL.createTable(tableName, cf);
// Do a first put to be sure that the connection is established, meta is there and so on.
Put p = new Put(row);
p.addColumn(cf, qualifier, val);
t.put(p);
// Do a few gets
for (int i = 0; i < 10; i++) {
t.get(new Get(row));
}
TEST_UTIL.getHBaseAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
assertTrue("Total read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0);
assertTrue("Total local read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0);
assertEquals("Total short circuit read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead());
assertEquals("Total zero-byte read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead());
}
}