HBASE-8868. add metric to report client shortcircuit reads. (#1334)
Signed-off-by: stack <stack@apache.net>
This commit is contained in:
parent
2ca0a105bc
commit
84977eeebb
|
@ -480,6 +480,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
|||
String HEDGED_READ_WINS_DESC =
|
||||
"The number of times we started a hedged read and a hedged read won";
|
||||
|
||||
String TOTAL_BYTES_READ = "totalBytesRead";
|
||||
String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS";
|
||||
String LOCAL_BYTES_READ = "localBytesRead";
|
||||
String LOCAL_BYTES_READ_DESC =
|
||||
"The number of bytes read from the local HDFS DataNode";
|
||||
String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead";
|
||||
String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read";
|
||||
String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead";
|
||||
String ZEROCOPY_BYTES_READ_DESC =
|
||||
"The number of bytes read through HDFS zero copy";
|
||||
|
||||
String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
|
||||
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
|
||||
+ "larger than blockingMemStoreSize";
|
||||
|
|
|
@ -452,6 +452,26 @@ public interface MetricsRegionServerWrapper {
|
|||
*/
|
||||
long getHedgedReadWins();
|
||||
|
||||
/**
|
||||
* @return Number of total bytes read from HDFS.
|
||||
*/
|
||||
long getTotalBytesRead();
|
||||
|
||||
/**
|
||||
* @return Number of bytes read from the local HDFS DataNode.
|
||||
*/
|
||||
long getLocalBytesRead();
|
||||
|
||||
/**
|
||||
* @return Number of bytes read locally through HDFS short circuit.
|
||||
*/
|
||||
long getShortCircuitBytesRead();
|
||||
|
||||
/**
|
||||
* @return Number of bytes read locally through HDFS zero copy.
|
||||
*/
|
||||
long getZeroCopyBytesRead();
|
||||
|
||||
/**
|
||||
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
|
||||
*/
|
||||
|
|
|
@ -510,6 +510,18 @@ public class MetricsRegionServerSourceImpl
|
|||
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
|
||||
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
|
||||
rsWrap.getPercentFileLocalSecondaryRegions())
|
||||
.addGauge(Interns.info(TOTAL_BYTES_READ,
|
||||
TOTAL_BYTES_READ_DESC),
|
||||
rsWrap.getTotalBytesRead())
|
||||
.addGauge(Interns.info(LOCAL_BYTES_READ,
|
||||
LOCAL_BYTES_READ_DESC),
|
||||
rsWrap.getLocalBytesRead())
|
||||
.addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ,
|
||||
SHORTCIRCUIT_BYTES_READ_DESC),
|
||||
rsWrap.getShortCircuitBytesRead())
|
||||
.addGauge(Interns.info(ZEROCOPY_BYTES_READ,
|
||||
ZEROCOPY_BYTES_READ_DESC),
|
||||
rsWrap.getZeroCopyBytesRead())
|
||||
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
|
||||
rsWrap.getSplitQueueSize())
|
||||
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -94,6 +96,15 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
|
||||
private Method unbuffer = null;
|
||||
|
||||
private final static ReadStatistics readStatistics = new ReadStatistics();
|
||||
|
||||
private static class ReadStatistics {
|
||||
long totalBytesRead;
|
||||
long totalLocalBytesRead;
|
||||
long totalShortCircuitBytesRead;
|
||||
long totalZeroCopyBytesRead;
|
||||
}
|
||||
|
||||
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
|
||||
this(fs, path, false, -1L);
|
||||
}
|
||||
|
@ -232,14 +243,64 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/** Close stream(s) if necessary. */
|
||||
private void updateInputStreamStatistics(FSDataInputStream stream) {
|
||||
// If the underlying file system is HDFS, update read statistics upon close.
|
||||
if (stream instanceof HdfsDataInputStream) {
|
||||
/**
|
||||
* Because HDFS ReadStatistics is calculated per input stream, it is not
|
||||
* feasible to update the aggregated number in real time. Instead, the
|
||||
* metrics are updated when an input stream is closed.
|
||||
*/
|
||||
HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
|
||||
synchronized (readStatistics) {
|
||||
readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||
getTotalBytesRead();
|
||||
readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||
getTotalBytesRead();
|
||||
readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||
getTotalShortCircuitBytesRead();
|
||||
readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
|
||||
getTotalZeroCopyBytesRead();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static long getTotalBytesRead() {
|
||||
synchronized (readStatistics) {
|
||||
return readStatistics.totalBytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
public static long getLocalBytesRead() {
|
||||
synchronized (readStatistics) {
|
||||
return readStatistics.totalLocalBytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
public static long getShortCircuitBytesRead() {
|
||||
synchronized (readStatistics) {
|
||||
return readStatistics.totalShortCircuitBytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
public static long getZeroCopyBytesRead() {
|
||||
synchronized (readStatistics) {
|
||||
return readStatistics.totalZeroCopyBytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
/** CloseClose stream(s) if necessary. */
|
||||
@Override
|
||||
public void close() {
|
||||
if (!doCloseStreams) {
|
||||
return;
|
||||
}
|
||||
updateInputStreamStatistics(this.streamNoFsChecksum);
|
||||
// we do not care about the close exception as it is for reading, no data loss issue.
|
||||
IOUtils.closeQuietly(streamNoFsChecksum);
|
||||
|
||||
|
||||
updateInputStreamStatistics(stream);
|
||||
IOUtils.closeQuietly(stream);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||
|
@ -928,6 +929,26 @@ class MetricsRegionServerWrapperImpl
|
|||
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalBytesRead() {
|
||||
return FSDataInputStreamWrapper.getTotalBytesRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLocalBytesRead() {
|
||||
return FSDataInputStreamWrapper.getLocalBytesRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getShortCircuitBytesRead() {
|
||||
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getZeroCopyBytesRead() {
|
||||
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockedRequestsCount() {
|
||||
return blockedRequestsCount;
|
||||
|
|
|
@ -420,6 +420,26 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
|
|||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalBytesRead() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLocalBytesRead() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getShortCircuitBytesRead() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getZeroCopyBytesRead() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockedRequestsCount() {
|
||||
return 0;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -624,4 +625,22 @@ public class TestRegionServerMetrics {
|
|||
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
||||
assertTrue(metricsHelper.getGaugeDouble("averageRegionSize", serverSource) > 0.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadBytes() throws Exception {
|
||||
// Do a first put to be sure that the connection is established, meta is there and so on.
|
||||
doNPuts(1, false);
|
||||
doNGets(10, false);
|
||||
TEST_UTIL.getAdmin().flush(tableName);
|
||||
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
||||
|
||||
assertTrue("Total read bytes should be larger than 0",
|
||||
metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0);
|
||||
assertTrue("Total local read bytes should be larger than 0",
|
||||
metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0);
|
||||
assertEquals("Total short circuit read bytes should be equal to 0", 0,
|
||||
metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead());
|
||||
assertEquals("Total zero-byte read bytes should be equal to 0", 0,
|
||||
metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1177,6 +1177,13 @@ is complaining in the logs, include `dfs.client.read.shortcircuit.streams.cache
|
|||
`dfs.client.socketcache.capacity`. Documentation is sparse on these options. You'll have to
|
||||
read source code.
|
||||
|
||||
RegionServer metric system exposes HDFS short circuit read metrics `shortCircuitBytesRead`. Other
|
||||
HDFS read metrics, including
|
||||
`totalBytesRead` (The total number of bytes read from HDFS),
|
||||
`localBytesRead` (The number of bytes read from the local HDFS DataNode),
|
||||
`zeroCopyBytesRead` (The number of bytes read through HDFS zero copy)
|
||||
are available and can be used to troubleshoot short-circuit read issues.
|
||||
|
||||
For more on short-circuit reads, see Colin's old blog on rollout,
|
||||
link:http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/[How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop].
|
||||
The link:https://issues.apache.org/jira/browse/HDFS-347[HDFS-347] issue also makes for an
|
||||
|
|
Loading…
Reference in New Issue