HBASE-8868. add metric to report client shortcircuit reads. (#1334)
Signed-off-by: stack <stack@apache.net>
This commit is contained in:
parent
d7189127fb
commit
8521207be4
|
@ -474,6 +474,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
||||||
String HEDGED_READ_WINS_DESC =
|
String HEDGED_READ_WINS_DESC =
|
||||||
"The number of times we started a hedged read and a hedged read won";
|
"The number of times we started a hedged read and a hedged read won";
|
||||||
|
|
||||||
|
String TOTAL_BYTES_READ = "totalBytesRead";
|
||||||
|
String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS";
|
||||||
|
String LOCAL_BYTES_READ = "localBytesRead";
|
||||||
|
String LOCAL_BYTES_READ_DESC =
|
||||||
|
"The number of bytes read from the local HDFS DataNode";
|
||||||
|
String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead";
|
||||||
|
String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read";
|
||||||
|
String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead";
|
||||||
|
String ZEROCOPY_BYTES_READ_DESC =
|
||||||
|
"The number of bytes read through HDFS zero copy";
|
||||||
|
|
||||||
String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
|
String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
|
||||||
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
|
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
|
||||||
+ "larger than blockingMemStoreSize";
|
+ "larger than blockingMemStoreSize";
|
||||||
|
|
|
@ -442,6 +442,26 @@ public interface MetricsRegionServerWrapper {
|
||||||
*/
|
*/
|
||||||
long getHedgedReadWins();
|
long getHedgedReadWins();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Number of total bytes read from HDFS.
|
||||||
|
*/
|
||||||
|
long getTotalBytesRead();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Number of bytes read from the local HDFS DataNode.
|
||||||
|
*/
|
||||||
|
long getLocalBytesRead();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Number of bytes read locally through HDFS short circuit.
|
||||||
|
*/
|
||||||
|
long getShortCircuitBytesRead();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Number of bytes read locally through HDFS zero copy.
|
||||||
|
*/
|
||||||
|
long getZeroCopyBytesRead();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
|
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -506,6 +506,18 @@ public class MetricsRegionServerSourceImpl
|
||||||
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
|
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
|
||||||
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
|
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
|
||||||
rsWrap.getPercentFileLocalSecondaryRegions())
|
rsWrap.getPercentFileLocalSecondaryRegions())
|
||||||
|
.addGauge(Interns.info(TOTAL_BYTES_READ,
|
||||||
|
TOTAL_BYTES_READ_DESC),
|
||||||
|
rsWrap.getTotalBytesRead())
|
||||||
|
.addGauge(Interns.info(LOCAL_BYTES_READ,
|
||||||
|
LOCAL_BYTES_READ_DESC),
|
||||||
|
rsWrap.getLocalBytesRead())
|
||||||
|
.addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ,
|
||||||
|
SHORTCIRCUIT_BYTES_READ_DESC),
|
||||||
|
rsWrap.getShortCircuitBytesRead())
|
||||||
|
.addGauge(Interns.info(ZEROCOPY_BYTES_READ,
|
||||||
|
ZEROCOPY_BYTES_READ_DESC),
|
||||||
|
rsWrap.getZeroCopyBytesRead())
|
||||||
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
|
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
|
||||||
rsWrap.getSplitQueueSize())
|
rsWrap.getSplitQueueSize())
|
||||||
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -94,6 +96,15 @@ public class FSDataInputStreamWrapper implements Closeable {
|
||||||
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
|
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
|
||||||
private Method unbuffer = null;
|
private Method unbuffer = null;
|
||||||
|
|
||||||
|
private final static ReadStatistics readStatistics = new ReadStatistics();
|
||||||
|
|
||||||
|
private static class ReadStatistics {
|
||||||
|
long totalBytesRead;
|
||||||
|
long totalLocalBytesRead;
|
||||||
|
long totalShortCircuitBytesRead;
|
||||||
|
long totalZeroCopyBytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
|
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
|
||||||
this(fs, path, false, -1L);
|
this(fs, path, false, -1L);
|
||||||
}
|
}
|
||||||
|
@ -232,14 +243,64 @@ public class FSDataInputStreamWrapper implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Close stream(s) if necessary. */
|
private void updateInputStreamStatistics(FSDataInputStream stream) {
|
||||||
|
// If the underlying file system is HDFS, update read statistics upon close.
|
||||||
|
if (stream instanceof HdfsDataInputStream) {
|
||||||
|
/**
|
||||||
|
* Because HDFS ReadStatistics is calculated per input stream, it is not
|
||||||
|
* feasible to update the aggregated number in real time. Instead, the
|
||||||
|
* metrics are updated when an input stream is closed.
|
||||||
|
*/
|
||||||
|
HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
|
||||||
|
synchronized (readStatistics) {
|
||||||
|
readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||||
|
getTotalBytesRead();
|
||||||
|
readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||||
|
getTotalBytesRead();
|
||||||
|
readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||||
|
getTotalShortCircuitBytesRead();
|
||||||
|
readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||||
|
getTotalZeroCopyBytesRead();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getTotalBytesRead() {
|
||||||
|
synchronized (readStatistics) {
|
||||||
|
return readStatistics.totalBytesRead;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getLocalBytesRead() {
|
||||||
|
synchronized (readStatistics) {
|
||||||
|
return readStatistics.totalLocalBytesRead;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getShortCircuitBytesRead() {
|
||||||
|
synchronized (readStatistics) {
|
||||||
|
return readStatistics.totalShortCircuitBytesRead;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long getZeroCopyBytesRead() {
|
||||||
|
synchronized (readStatistics) {
|
||||||
|
return readStatistics.totalZeroCopyBytesRead;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** CloseClose stream(s) if necessary. */
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (!doCloseStreams) {
|
if (!doCloseStreams) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
updateInputStreamStatistics(this.streamNoFsChecksum);
|
||||||
// we do not care about the close exception as it is for reading, no data loss issue.
|
// we do not care about the close exception as it is for reading, no data loss issue.
|
||||||
IOUtils.closeQuietly(streamNoFsChecksum);
|
IOUtils.closeQuietly(streamNoFsChecksum);
|
||||||
|
|
||||||
|
|
||||||
|
updateInputStreamStatistics(stream);
|
||||||
IOUtils.closeQuietly(stream);
|
IOUtils.closeQuietly(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||||
|
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||||
|
@ -905,6 +906,26 @@ class MetricsRegionServerWrapperImpl
|
||||||
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
|
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalBytesRead() {
|
||||||
|
return FSDataInputStreamWrapper.getTotalBytesRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLocalBytesRead() {
|
||||||
|
return FSDataInputStreamWrapper.getLocalBytesRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getShortCircuitBytesRead() {
|
||||||
|
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getZeroCopyBytesRead() {
|
||||||
|
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getBlockedRequestsCount() {
|
public long getBlockedRequestsCount() {
|
||||||
return blockedRequestsCount;
|
return blockedRequestsCount;
|
||||||
|
|
|
@ -410,6 +410,26 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
|
||||||
return 10;
|
return 10;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getTotalBytesRead() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLocalBytesRead() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getShortCircuitBytesRead() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getZeroCopyBytesRead() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getBlockedRequestsCount() {
|
public long getBlockedRequestsCount() {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -622,4 +623,22 @@ public class TestRegionServerMetrics {
|
||||||
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
||||||
assertTrue(metricsHelper.getGaugeDouble("averageRegionSize", serverSource) > 0.0);
|
assertTrue(metricsHelper.getGaugeDouble("averageRegionSize", serverSource) > 0.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadBytes() throws Exception {
|
||||||
|
// Do a first put to be sure that the connection is established, meta is there and so on.
|
||||||
|
doNPuts(1, false);
|
||||||
|
doNGets(10, false);
|
||||||
|
TEST_UTIL.getAdmin().flush(tableName);
|
||||||
|
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
||||||
|
|
||||||
|
assertTrue("Total read bytes should be larger than 0",
|
||||||
|
metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0);
|
||||||
|
assertTrue("Total local read bytes should be larger than 0",
|
||||||
|
metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0);
|
||||||
|
assertEquals("Total short circuit read bytes should be equal to 0", 0,
|
||||||
|
metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead());
|
||||||
|
assertEquals("Total zero-byte read bytes should be equal to 0", 0,
|
||||||
|
metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1177,6 +1177,13 @@ is complaining in the logs, include `dfs.client.read.shortcircuit.streams.cache
|
||||||
`dfs.client.socketcache.capacity`. Documentation is sparse on these options. You'll have to
|
`dfs.client.socketcache.capacity`. Documentation is sparse on these options. You'll have to
|
||||||
read source code.
|
read source code.
|
||||||
|
|
||||||
|
RegionServer metric system exposes HDFS short circuit read metrics `shortCircuitBytesRead`. Other
|
||||||
|
HDFS read metrics, including
|
||||||
|
`totalBytesRead` (The total number of bytes read from HDFS),
|
||||||
|
`localBytesRead` (The number of bytes read from the local HDFS DataNode),
|
||||||
|
`zeroCopyBytesRead` (The number of bytes read through HDFS zero copy)
|
||||||
|
are available and can be used to troubleshoot short-circuit read issues.
|
||||||
|
|
||||||
For more on short-circuit reads, see Colin's old blog on rollout,
|
For more on short-circuit reads, see Colin's old blog on rollout,
|
||||||
link:http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/[How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop].
|
link:http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/[How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop].
|
||||||
The link:https://issues.apache.org/jira/browse/HDFS-347[HDFS-347] issue also makes for an
|
The link:https://issues.apache.org/jira/browse/HDFS-347[HDFS-347] issue also makes for an
|
||||||
|
|
Loading…
Reference in New Issue