HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)
Co-authored-by: stack <stack@apache.org> Co-authored-by: Javier <javier.lucadetena@linecorp.com> Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
1870876bd3
commit
6aa2286733
|
@ -504,6 +504,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
|
|||
String MAJOR_COMPACTED_OUTPUT_BYTES_DESC
|
||||
= "Total number of bytes that is output from compaction, major only";
|
||||
|
||||
String HEDGED_READS = "hedgedReads";
|
||||
String HEDGED_READS_DESC = "The number of times we started a hedged read";
|
||||
String HEDGED_READ_WINS = "hedgedReadWins";
|
||||
String HEDGED_READ_WINS_DESC =
|
||||
"The number of times we started a hedged read and a hedged read won";
|
||||
|
||||
String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount";
|
||||
String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered.";
|
||||
String RPC_SCAN_REQUEST_COUNT = "rpcScanRequestCount";
|
||||
|
|
|
@ -348,6 +348,16 @@ public interface MetricsRegionServerWrapper {
|
|||
*/
|
||||
long getMajorCompactedCellsSize();
|
||||
|
||||
/**
|
||||
* @return Count of hedged read operations
|
||||
*/
|
||||
long getHedgedReadOps();
|
||||
|
||||
/**
|
||||
* @return Count of times a hedged read beat out the primary read.
|
||||
*/
|
||||
long getHedgedReadWins();
|
||||
|
||||
/**
|
||||
* @return Number of total bytes read from HDFS.
|
||||
*/
|
||||
|
|
|
@ -504,6 +504,11 @@ public class MetricsRegionServerSourceImpl
|
|||
rsWrap.getCompactedCellsSize())
|
||||
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
|
||||
rsWrap.getMajorCompactedCellsSize())
|
||||
|
||||
.addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps())
|
||||
.addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
|
||||
rsWrap.getHedgedReadWins())
|
||||
|
||||
.addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC),
|
||||
rsWrap.getBlockedRequestsCount())
|
||||
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -40,7 +41,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
|
||||
/**
|
||||
|
@ -98,6 +101,11 @@ class MetricsRegionServerWrapperImpl
|
|||
private Runnable runnable;
|
||||
private long period;
|
||||
|
||||
/**
|
||||
* Can be null if not on hdfs.
|
||||
*/
|
||||
private DFSHedgedReadMetrics dfsHedgedReadMetrics;
|
||||
|
||||
public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
|
||||
this.regionServer = regionServer;
|
||||
initBlockCache();
|
||||
|
@ -111,6 +119,11 @@ class MetricsRegionServerWrapperImpl
|
|||
this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
try {
|
||||
this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get hedged metrics", e);
|
||||
}
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
|
||||
}
|
||||
|
@ -819,6 +832,16 @@ class MetricsRegionServerWrapperImpl
|
|||
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadOps() {
|
||||
return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadOps();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadWins() {
|
||||
return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadWins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockedRequestsCount() {
|
||||
return blockedRequestsCount;
|
||||
|
|
|
@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
|
|||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
|
@ -1780,4 +1782,47 @@ public abstract class FSUtils extends CommonFSUtils {
|
|||
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
|
||||
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
|
||||
*/
|
||||
public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
|
||||
throws IOException {
|
||||
if (!isHDFS(c)) {
|
||||
return null;
|
||||
}
|
||||
// getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
|
||||
// to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
|
||||
// to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
|
||||
final String name = "getHedgedReadMetrics";
|
||||
DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
|
||||
Method m;
|
||||
try {
|
||||
m = dfsclient.getClass().getDeclaredMethod(name);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (SecurityException e) {
|
||||
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
}
|
||||
m.setAccessible(true);
|
||||
try {
|
||||
return (DFSHedgedReadMetrics)m.invoke(dfsclient);
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (InvocationTargetException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -360,6 +360,16 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
|
|||
return 10240000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadOps() {
|
||||
return 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadWins() {
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalBytesRead() {
|
||||
return 0;
|
||||
|
|
|
@ -27,11 +27,13 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -43,6 +45,9 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
|||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -435,4 +440,126 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
|
||||
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
|
||||
*/
|
||||
@Test public void testDFSHedgedReadMetrics() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
// Enable hedged reads and set it so the threshold is really low.
|
||||
// Most of this test is taken from HDFS, from TestPread.
|
||||
Configuration conf = htu.getConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
|
||||
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
|
||||
// Set short retry timeouts so this test runs faster
|
||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
|
||||
conf.setBoolean("dfs.datanode.transferTo.allowed", false);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
// Get the metrics. Should be empty.
|
||||
DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
|
||||
assertEquals(0, metrics.getHedgedReadOps());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
try {
|
||||
Path p = new Path("preadtest.dat");
|
||||
// We need > 1 blocks to test out the hedged reads.
|
||||
DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
|
||||
blockSize, (short) 3, seed);
|
||||
pReadFile(fileSys, p);
|
||||
cleanupFile(fileSys, p);
|
||||
assertTrue(metrics.getHedgedReadOps() > 0);
|
||||
} finally {
|
||||
fileSys.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// Below is taken from TestPread over in HDFS.
|
||||
static final int blockSize = 4096;
|
||||
static final long seed = 0xDEADBEEFL;
|
||||
|
||||
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
|
||||
FSDataInputStream stm = fileSys.open(name);
|
||||
byte[] expected = new byte[12 * blockSize];
|
||||
Random rand = new Random(seed);
|
||||
rand.nextBytes(expected);
|
||||
// do a sanity check. Read first 4K bytes
|
||||
byte[] actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
|
||||
// now do a pread for the first 8K bytes
|
||||
actual = new byte[8192];
|
||||
doPread(stm, 0L, actual, 0, 8192);
|
||||
checkAndEraseData(actual, 0, expected, "Pread Test 1");
|
||||
// Now check to see if the normal read returns 4K - 8K byte range
|
||||
actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 4096, expected, "Pread Test 2");
|
||||
// Now see if we can cross a single block boundary successfully
|
||||
// read 4K bytes from blockSize - 2K offset
|
||||
stm.readFully(blockSize - 2048, actual, 0, 4096);
|
||||
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
|
||||
// now see if we can cross two block boundaries successfully
|
||||
// read blockSize + 4K bytes from blockSize - 2K offset
|
||||
actual = new byte[blockSize + 4096];
|
||||
stm.readFully(blockSize - 2048, actual);
|
||||
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
|
||||
// now see if we can cross two block boundaries that are not cached
|
||||
// read blockSize + 4K bytes from 10 * blockSize - 2K offset
|
||||
actual = new byte[blockSize + 4096];
|
||||
stm.readFully(10 * blockSize - 2048, actual);
|
||||
checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
|
||||
// now check that even after all these preads, we can still read
|
||||
// bytes 8K - 12K
|
||||
actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 8192, expected, "Pread Test 6");
|
||||
// done
|
||||
stm.close();
|
||||
// check block location caching
|
||||
stm = fileSys.open(name);
|
||||
stm.readFully(1, actual, 0, 4096);
|
||||
stm.readFully(4 * blockSize, actual, 0, 4096);
|
||||
stm.readFully(7 * blockSize, actual, 0, 4096);
|
||||
actual = new byte[3 * 4096];
|
||||
stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
|
||||
checkAndEraseData(actual, 0, expected, "Pread Test 7");
|
||||
actual = new byte[8 * 4096];
|
||||
stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
|
||||
checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
|
||||
// read the tail
|
||||
stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
|
||||
IOException res = null;
|
||||
try { // read beyond the end of the file
|
||||
stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
|
||||
} catch (IOException e) {
|
||||
// should throw an exception
|
||||
res = e;
|
||||
}
|
||||
assertTrue("Error reading beyond file boundary.", res != null);
|
||||
|
||||
stm.close();
|
||||
}
|
||||
|
||||
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
|
||||
for (int idx = 0; idx < actual.length; idx++) {
|
||||
assertEquals(message + " byte " + (from + idx) + " differs. expected " +
|
||||
expected[from + idx] + " actual " + actual[idx],
|
||||
actual[idx], expected[from + idx]);
|
||||
actual[idx] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
|
||||
int offset, int length) throws IOException {
|
||||
int nread = 0;
|
||||
|
||||
while (nread < length) {
|
||||
int nbytes =
|
||||
stm.read(position + nread, buffer, offset + nread, length - nread);
|
||||
assertTrue("Error in pread", nbytes > 0);
|
||||
nread += nbytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue