From 6aa2286733f8ca224c03c1a61c17077fa29410a3 Mon Sep 17 00:00:00 2001 From: Javier Akira Luca de Tena Date: Thu, 28 May 2020 11:11:10 +0900 Subject: [PATCH] HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781) Co-authored-by: stack Co-authored-by: Javier Signed-off-by: Reid Chan --- .../MetricsRegionServerSource.java | 6 + .../MetricsRegionServerWrapper.java | 10 ++ .../MetricsRegionServerSourceImpl.java | 5 + .../MetricsRegionServerWrapperImpl.java | 23 ++++ .../org/apache/hadoop/hbase/util/FSUtils.java | 45 +++++++ .../MetricsRegionServerWrapperStub.java | 10 ++ .../apache/hadoop/hbase/util/TestFSUtils.java | 127 ++++++++++++++++++ 7 files changed, 226 insertions(+) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index e006259eb15..219bb693eec 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -504,6 +504,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String MAJOR_COMPACTED_OUTPUT_BYTES_DESC = "Total number of bytes that is output from compaction, major only"; + String HEDGED_READS = "hedgedReads"; + String HEDGED_READS_DESC = "The number of times we started a hedged read"; + String HEDGED_READ_WINS = "hedgedReadWins"; + String HEDGED_READ_WINS_DESC = + "The number of times we started a hedged read and a hedged read won"; + String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount"; String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered."; String RPC_SCAN_REQUEST_COUNT = "rpcScanRequestCount"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index ec84acc3fa3..c9e15451323 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -348,6 +348,16 @@ public interface MetricsRegionServerWrapper { */ long getMajorCompactedCellsSize(); + /** + * @return Count of hedged read operations + */ + long getHedgedReadOps(); + + /** + * @return Count of times a hedged read beat out the primary read. + */ + long getHedgedReadWins(); + /** * @return Number of total bytes read from HDFS. */ diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 31b5a9799af..24ab1675b68 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -504,6 +504,11 @@ public class MetricsRegionServerSourceImpl rsWrap.getCompactedCellsSize()) .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), rsWrap.getMajorCompactedCellsSize()) + + .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps()) + .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC), + rsWrap.getHedgedReadWins()) + .addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC), rsWrap.getBlockedRequestsCount()) .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 0d20c7deabf..6817e04970d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -40,7 +41,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.metrics2.MetricsExecutor; /** @@ -98,6 +101,11 @@ class MetricsRegionServerWrapperImpl private Runnable runnable; private long period; + /** + * Can be null if not on hdfs. + */ + private DFSHedgedReadMetrics dfsHedgedReadMetrics; + public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { this.regionServer = regionServer; initBlockCache(); @@ -111,6 +119,11 @@ class MetricsRegionServerWrapperImpl this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period, TimeUnit.MILLISECONDS); + try { + this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration()); + } catch (IOException e) { + LOG.warn("Failed to get hedged metrics", e); + } if (LOG.isInfoEnabled()) { LOG.info("Computing regionserver metrics every " + this.period + " milliseconds"); } @@ -819,6 +832,16 @@ class MetricsRegionServerWrapperImpl return FSDataInputStreamWrapper.getZeroCopyBytesRead(); } + @Override + public long getHedgedReadOps() { + return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadOps(); + } + + @Override + public long getHedgedReadWins() { + return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadWins(); + } + @Override public long getBlockedRequestsCount() { return blockedRequestsCount; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 3a04b4cb26a..6ea7bea48b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FSProtos; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -1780,4 +1782,47 @@ public abstract class FSUtils extends CommonFSUtils { int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } + + /** + * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. + */ + public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) + throws IOException { + if (!isHDFS(c)) { + return null; + } + // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal + // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it + // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. + final String name = "getHedgedReadMetrics"; + DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); + Method m; + try { + m = dfsclient.getClass().getDeclaredMethod(name); + } catch (NoSuchMethodException e) { + LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (SecurityException e) { + LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } + m.setAccessible(true); + try { + return (DFSHedgedReadMetrics)m.invoke(dfsclient); + } catch (IllegalAccessException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (IllegalArgumentException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (InvocationTargetException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index dbf475893aa..90c240e4b2a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -360,6 +360,16 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 10240000; } + @Override + public long getHedgedReadOps() { + return 100; + } + + @Override + public long getHedgedReadWins() { + return 10; + } + @Override public long getTotalBytesRead() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 52027a9c95a..43f51874381 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -27,11 +27,13 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.Random; import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +45,9 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.Assert; import org.junit.Before; @@ -435,4 +440,126 @@ public class TestFSUtils { } } + /** + * Ugly test that ensures we can get at the hedged read counters in dfsclient. + * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. + */ + @Test public void testDFSHedgedReadMetrics() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + // Enable hedged reads and set it so the threshold is really low. + // Most of this test is taken from HDFS, from TestPread. + Configuration conf = htu.getConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); + conf.setBoolean("dfs.datanode.transferTo.allowed", false); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + // Get the metrics. Should be empty. + DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); + assertEquals(0, metrics.getHedgedReadOps()); + FileSystem fileSys = cluster.getFileSystem(); + try { + Path p = new Path("preadtest.dat"); + // We need > 1 blocks to test out the hedged reads. + DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, + blockSize, (short) 3, seed); + pReadFile(fileSys, p); + cleanupFile(fileSys, p); + assertTrue(metrics.getHedgedReadOps() > 0); + } finally { + fileSys.close(); + cluster.shutdown(); + } + } + + // Below is taken from TestPread over in HDFS. + static final int blockSize = 4096; + static final long seed = 0xDEADBEEFL; + + private void pReadFile(FileSystem fileSys, Path name) throws IOException { + FSDataInputStream stm = fileSys.open(name); + byte[] expected = new byte[12 * blockSize]; + Random rand = new Random(seed); + rand.nextBytes(expected); + // do a sanity check. Read first 4K bytes + byte[] actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 0, expected, "Read Sanity Test"); + // now do a pread for the first 8K bytes + actual = new byte[8192]; + doPread(stm, 0L, actual, 0, 8192); + checkAndEraseData(actual, 0, expected, "Pread Test 1"); + // Now check to see if the normal read returns 4K - 8K byte range + actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 4096, expected, "Pread Test 2"); + // Now see if we can cross a single block boundary successfully + // read 4K bytes from blockSize - 2K offset + stm.readFully(blockSize - 2048, actual, 0, 4096); + checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); + // now see if we can cross two block boundaries successfully + // read blockSize + 4K bytes from blockSize - 2K offset + actual = new byte[blockSize + 4096]; + stm.readFully(blockSize - 2048, actual); + checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); + // now see if we can cross two block boundaries that are not cached + // read blockSize + 4K bytes from 10 * blockSize - 2K offset + actual = new byte[blockSize + 4096]; + stm.readFully(10 * blockSize - 2048, actual); + checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); + // now check that even after all these preads, we can still read + // bytes 8K - 12K + actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 8192, expected, "Pread Test 6"); + // done + stm.close(); + // check block location caching + stm = fileSys.open(name); + stm.readFully(1, actual, 0, 4096); + stm.readFully(4 * blockSize, actual, 0, 4096); + stm.readFully(7 * blockSize, actual, 0, 4096); + actual = new byte[3 * 4096]; + stm.readFully(0 * blockSize, actual, 0, 3 * 4096); + checkAndEraseData(actual, 0, expected, "Pread Test 7"); + actual = new byte[8 * 4096]; + stm.readFully(3 * blockSize, actual, 0, 8 * 4096); + checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8"); + // read the tail + stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2); + IOException res = null; + try { // read beyond the end of the file + stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize); + } catch (IOException e) { + // should throw an exception + res = e; + } + assertTrue("Error reading beyond file boundary.", res != null); + + stm.close(); + } + + private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { + for (int idx = 0; idx < actual.length; idx++) { + assertEquals(message + " byte " + (from + idx) + " differs. expected " + + expected[from + idx] + " actual " + actual[idx], + actual[idx], expected[from + idx]); + actual[idx] = 0; + } + } + + private void doPread(FSDataInputStream stm, long position, byte[] buffer, + int offset, int length) throws IOException { + int nread = 0; + + while (nread < length) { + int nbytes = + stm.read(position + nread, buffer, offset + nread, length - nread); + assertTrue("Error in pread", nbytes > 0); + nread += nbytes; + } + } }