From 71ed7033675149956de855b6782e1e22fc908dc8 Mon Sep 17 00:00:00 2001 From: stack Date: Thu, 9 Oct 2014 12:50:56 -0700 Subject: [PATCH] Add hedgedReads and hedgedReadWins count metrics Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java --- .../MetricsRegionServerSource.java | 5 + .../MetricsRegionServerWrapper.java | 10 ++ .../MetricsRegionServerSourceImpl.java | 5 + .../hadoop/hbase/master/SplitLogManager.java | 7 +- .../MetricsRegionServerWrapperImpl.java | 23 +++ .../org/apache/hadoop/hbase/util/FSUtils.java | 45 +++++ .../MetricsRegionServerWrapperStub.java | 9 + .../apache/hadoop/hbase/util/TestFSUtils.java | 155 ++++++++++++++++++ 8 files changed, 256 insertions(+), 3 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index 43987946e68..fa795230f1e 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -234,4 +234,9 @@ public interface MetricsRegionServerSource extends BaseSource { String MAJOR_COMPACTED_CELLS_SIZE_DESC = "The total amount of data processed during major compactions, in bytes"; + String HEDGED_READS = "hedgedReads"; + String HEDGED_READS_DESC = "The number of times we started a hedged read"; + String HEDGED_READ_WINS = "hedgedReadWins"; + String HEDGED_READ_WINS_DESC = + "The number of times we started a hedged read and a hedged read won"; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 998bd17e67a..513a0db8351 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -246,4 +246,14 @@ public interface MetricsRegionServerWrapper { * Get the total amount of data processed during major compactions, in bytes. */ long getMajorCompactedCellsSize(); + + /** + * @return Count of hedged read operations + */ + public long getHedgedReadOps(); + + /** + * @return Count of times a hedged read beat out the primary read. + */ + public long getHedgedReadWins(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index f85929604d1..a6377c0699b 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -219,6 +219,11 @@ public class MetricsRegionServerSourceImpl rsWrap.getCompactedCellsSize()) .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), rsWrap.getMajorCompactedCellsSize()) + + .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps()) + .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC), + rsWrap.getHedgedReadWins()) + .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 97ac02ccd11..bf28a44493d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -569,9 +570,9 @@ public class SplitLogManager { * @return whether log is replaying */ public boolean isLogReplaying() { - if (server.getCoordinatedStateManager() == null) return false; - return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) - .getSplitLogManagerCoordination().isReplaying(); + CoordinatedStateManager m = server.getCoordinatedStateManager(); + if (m == null) return false; + return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 3ae2235b3f3..327f55caf1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.Collection; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,7 +34,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.metrics2.MetricsExecutor; /** @@ -78,6 +81,11 @@ class MetricsRegionServerWrapperImpl private Runnable runnable; private long period; + /** + * Can be null if not on hdfs. + */ + private DFSHedgedReadMetrics dfsHedgedReadMetrics; + public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) { this.regionServer = regionServer; initBlockCache(); @@ -91,6 +99,11 @@ class MetricsRegionServerWrapperImpl this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period, TimeUnit.MILLISECONDS); + try { + this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration()); + } catch (IOException e) { + LOG.warn("Failed to get hedged metrics", e); + } if (LOG.isInfoEnabled()) { LOG.info("Computing regionserver metrics every " + this.period + " milliseconds"); } @@ -513,4 +526,14 @@ class MetricsRegionServerWrapperImpl majorCompactedCellsSize = tempMajorCompactedCellsSize; } } + + @Override + public long getHedgedReadOps() { + return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps(); + } + + @Override + public long getHedgedReadWins() { + return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index ed1a4e6aeff..51af44043e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FSProtos; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.io.IOUtils; @@ -1869,4 +1871,47 @@ public abstract class FSUtils { int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize); conf.setIfUnset(dfsKey, Integer.toString(hbaseSize)); } + + /** + * @param c + * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs. + * @throws IOException + */ + public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c) + throws IOException { + if (!isHDFS(c)) return null; + // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal + // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it + // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients. + final String name = "getHedgedReadMetrics"; + DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient(); + Method m; + try { + m = dfsclient.getClass().getDeclaredMethod(name); + } catch (NoSuchMethodException e) { + LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (SecurityException e) { + LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } + m.setAccessible(true); + try { + return (DFSHedgedReadMetrics)m.invoke(dfsclient); + } catch (IllegalAccessException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (IllegalArgumentException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } catch (InvocationTargetException e) { + LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " + + e.getMessage()); + return null; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 036af480493..ca95c4a551f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -241,4 +241,13 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 10240000; } + @Override + public long getHedgedReadOps() { + return 100; + } + + @Override + public long getHedgedReadWins() { + return 10; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 417c4a86cf7..da1b603b1d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -25,11 +25,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.util.Random; import java.util.UUID; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +44,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -334,4 +340,153 @@ public class TestFSUtils { assertEquals(expect, fs.getFileStatus(dst).getModificationTime()); cluster.shutdown(); } + + /** + * Ugly test that ensures we can get at the hedged read counters in dfsclient. + * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread. + * @throws Exception + */ + @Test public void testDFSHedgedReadMetrics() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + // Enable hedged reads and set it so the threshold is really low. + // Most of this test is taken from HDFS, from TestPread. + Configuration conf = htu.getConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5); + conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0); + conf.setBoolean("dfs.datanode.transferTo.allowed", false); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + // Get the metrics. Should be empty. + DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf); + assertEquals(0, metrics.getHedgedReadOps()); + FileSystem fileSys = cluster.getFileSystem(); + try { + Path p = new Path("preadtest.dat"); + // We need > 1 blocks to test out the hedged reads. + DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize, + blockSize, (short) 3, seed); + pReadFile(fileSys, p); + cleanupFile(fileSys, p); + assertTrue(metrics.getHedgedReadOps() > 0); + } finally { + fileSys.close(); + cluster.shutdown(); + } + } + + // Below is taken from TestPread over in HDFS. + static final int blockSize = 4096; + static final long seed = 0xDEADBEEFL; + + private void pReadFile(FileSystem fileSys, Path name) throws IOException { + FSDataInputStream stm = fileSys.open(name); + byte[] expected = new byte[12 * blockSize]; + Random rand = new Random(seed); + rand.nextBytes(expected); + // do a sanity check. Read first 4K bytes + byte[] actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 0, expected, "Read Sanity Test"); + // now do a pread for the first 8K bytes + actual = new byte[8192]; + doPread(stm, 0L, actual, 0, 8192); + checkAndEraseData(actual, 0, expected, "Pread Test 1"); + // Now check to see if the normal read returns 4K-8K byte range + actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 4096, expected, "Pread Test 2"); + // Now see if we can cross a single block boundary successfully + // read 4K bytes from blockSize - 2K offset + stm.readFully(blockSize - 2048, actual, 0, 4096); + checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3"); + // now see if we can cross two block boundaries successfully + // read blockSize + 4K bytes from blockSize - 2K offset + actual = new byte[blockSize + 4096]; + stm.readFully(blockSize - 2048, actual); + checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4"); + // now see if we can cross two block boundaries that are not cached + // read blockSize + 4K bytes from 10*blockSize - 2K offset + actual = new byte[blockSize + 4096]; + stm.readFully(10 * blockSize - 2048, actual); + checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5"); + // now check that even after all these preads, we can still read + // bytes 8K-12K + actual = new byte[4096]; + stm.readFully(actual); + checkAndEraseData(actual, 8192, expected, "Pread Test 6"); + // done + stm.close(); + // check block location caching + stm = fileSys.open(name); + stm.readFully(1, actual, 0, 4096); + stm.readFully(4*blockSize, actual, 0, 4096); + stm.readFully(7*blockSize, actual, 0, 4096); + actual = new byte[3*4096]; + stm.readFully(0*blockSize, actual, 0, 3*4096); + checkAndEraseData(actual, 0, expected, "Pread Test 7"); + actual = new byte[8*4096]; + stm.readFully(3*blockSize, actual, 0, 8*4096); + checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8"); + // read the tail + stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2); + IOException res = null; + try { // read beyond the end of the file + stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize); + } catch (IOException e) { + // should throw an exception + res = e; + } + assertTrue("Error reading beyond file boundary.", res != null); + + stm.close(); + } + + private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) { + for (int idx = 0; idx < actual.length; idx++) { + assertEquals(message+" byte "+(from+idx)+" differs. expected "+ + expected[from+idx]+" actual "+actual[idx], + actual[idx], expected[from+idx]); + actual[idx] = 0; + } + } + + private void doPread(FSDataInputStream stm, long position, byte[] buffer, + int offset, int length) throws IOException { + int nread = 0; + // long totalRead = 0; + // DFSInputStream dfstm = null; + + /* Disable. This counts do not add up. Some issue in original hdfs tests? + if (stm.getWrappedStream() instanceof DFSInputStream) { + dfstm = (DFSInputStream) (stm.getWrappedStream()); + totalRead = dfstm.getReadStatistics().getTotalBytesRead(); + } */ + + while (nread < length) { + int nbytes = + stm.read(position + nread, buffer, offset + nread, length - nread); + assertTrue("Error in pread", nbytes > 0); + nread += nbytes; + } + + /* Disable. This counts do not add up. Some issue in original hdfs tests? + if (dfstm != null) { + if (isHedgedRead) { + assertTrue("Expected read statistic to be incremented", + length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead); + } else { + assertEquals("Expected read statistic to be incremented", length, dfstm + .getReadStatistics().getTotalBytesRead() - totalRead); + } + }*/ + } + + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { + assertTrue(fileSys.exists(name)); + assertTrue(fileSys.delete(name, true)); + assertTrue(!fileSys.exists(name)); + } }