Add hedgedReads and hedgedReadWins count metrics

This commit is contained in:
stack 2014-10-09 12:50:56 -07:00
parent 72c8c62aa1
commit d5be58dfd2
8 changed files with 258 additions and 5 deletions

View File

@ -234,4 +234,9 @@ public interface MetricsRegionServerSource extends BaseSource {
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
"The total amount of data processed during major compactions, in bytes";
String HEDGED_READS = "hedgedReads";
String HEDGED_READS_DESC = "The number of times we started a hedged read";
String HEDGED_READ_WINS = "hedgedReadWins";
String HEDGED_READ_WINS_DESC =
"The number of times we started a hedged read and a hedged read won";
}

View File

@ -246,4 +246,14 @@ public interface MetricsRegionServerWrapper {
* Get the total amount of data processed during major compactions, in bytes.
*/
long getMajorCompactedCellsSize();
/**
* @return Count of hedged read operations
*/
public long getHedgedReadOps();
/**
* @return Count of times a hedged read beat out the primary read.
*/
public long getHedgedReadWins();
}

View File

@ -219,6 +219,11 @@ public class MetricsRegionServerSourceImpl
rsWrap.getCompactedCellsSize())
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
rsWrap.getMajorCompactedCellsSize())
.addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps())
.addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
rsWrap.getHedgedReadWins())
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
rsWrap.getZookeeperQuorum())
.tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName())

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@ -569,9 +570,9 @@ public class SplitLogManager {
* @return whether log is replaying
*/
public boolean isLogReplaying() {
if (server.getCoordinatedStateManager() == null) return false;
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().isReplaying();
CoordinatedStateManager m = server.getCoordinatedStateManager();
if (m == null) return false;
return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
}
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -33,7 +34,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.metrics2.MetricsExecutor;
/**
@ -78,6 +81,11 @@ class MetricsRegionServerWrapperImpl
private Runnable runnable;
private long period;
/**
* Can be null if not on hdfs.
*/
private DFSHedgedReadMetrics dfsHedgedReadMetrics;
public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
initBlockCache();
@ -91,6 +99,11 @@ class MetricsRegionServerWrapperImpl
this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
TimeUnit.MILLISECONDS);
try {
this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
} catch (IOException e) {
LOG.warn("Failed to get hedged metrics", e);
}
if (LOG.isInfoEnabled()) {
LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
}
@ -513,4 +526,14 @@ class MetricsRegionServerWrapperImpl
majorCompactedCellsSize = tempMajorCompactedCellsSize;
}
}
@Override
public long getHedgedReadOps() {
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
}
@Override
public long getHedgedReadWins() {
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
}
}

View File

@ -69,6 +69,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.IOUtils;
@ -1870,4 +1872,47 @@ public abstract class FSUtils {
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}
/**
* @param c
* @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
* @throws IOException
*/
public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
throws IOException {
if (!isHDFS(c)) return null;
// getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
// to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
// to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
final String name = "getHedgedReadMetrics";
DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
Method m;
try {
m = dfsclient.getClass().getDeclaredMethod(name);
} catch (NoSuchMethodException e) {
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
e.getMessage());
return null;
} catch (SecurityException e) {
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
e.getMessage());
return null;
}
m.setAccessible(true);
try {
return (DFSHedgedReadMetrics)m.invoke(dfsclient);
} catch (IllegalAccessException e) {
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
e.getMessage());
return null;
} catch (IllegalArgumentException e) {
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
e.getMessage());
return null;
} catch (InvocationTargetException e) {
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
e.getMessage());
return null;
}
}
}

View File

@ -241,4 +241,13 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
return 10240000;
}
@Override
public long getHedgedReadOps() {
return 100;
}
@Override
public long getHedgedReadWins() {
return 10;
}
}

View File

@ -25,11 +25,14 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -42,6 +45,9 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -335,4 +341,153 @@ public class TestFSUtils {
assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
cluster.shutdown();
}
/**
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
* @throws Exception
*/
@Test public void testDFSHedgedReadMetrics() throws Exception {
HBaseTestingUtility htu = new HBaseTestingUtility();
// Enable hedged reads and set it so the threshold is really low.
// Most of this test is taken from HDFS, from TestPread.
Configuration conf = htu.getConfiguration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
conf.setBoolean("dfs.datanode.transferTo.allowed", false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
// Get the metrics. Should be empty.
DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
assertEquals(0, metrics.getHedgedReadOps());
FileSystem fileSys = cluster.getFileSystem();
try {
Path p = new Path("preadtest.dat");
// We need > 1 blocks to test out the hedged reads.
DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
blockSize, (short) 3, seed);
pReadFile(fileSys, p);
cleanupFile(fileSys, p);
assertTrue(metrics.getHedgedReadOps() > 0);
} finally {
fileSys.close();
cluster.shutdown();
}
}
// Below is taken from TestPread over in HDFS.
static final int blockSize = 4096;
static final long seed = 0xDEADBEEFL;
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
FSDataInputStream stm = fileSys.open(name);
byte[] expected = new byte[12 * blockSize];
Random rand = new Random(seed);
rand.nextBytes(expected);
// do a sanity check. Read first 4K bytes
byte[] actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
// now do a pread for the first 8K bytes
actual = new byte[8192];
doPread(stm, 0L, actual, 0, 8192);
checkAndEraseData(actual, 0, expected, "Pread Test 1");
// Now check to see if the normal read returns 4K-8K byte range
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 4096, expected, "Pread Test 2");
// Now see if we can cross a single block boundary successfully
// read 4K bytes from blockSize - 2K offset
stm.readFully(blockSize - 2048, actual, 0, 4096);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
// now see if we can cross two block boundaries successfully
// read blockSize + 4K bytes from blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(blockSize - 2048, actual);
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
// now see if we can cross two block boundaries that are not cached
// read blockSize + 4K bytes from 10*blockSize - 2K offset
actual = new byte[blockSize + 4096];
stm.readFully(10 * blockSize - 2048, actual);
checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
// now check that even after all these preads, we can still read
// bytes 8K-12K
actual = new byte[4096];
stm.readFully(actual);
checkAndEraseData(actual, 8192, expected, "Pread Test 6");
// done
stm.close();
// check block location caching
stm = fileSys.open(name);
stm.readFully(1, actual, 0, 4096);
stm.readFully(4*blockSize, actual, 0, 4096);
stm.readFully(7*blockSize, actual, 0, 4096);
actual = new byte[3*4096];
stm.readFully(0*blockSize, actual, 0, 3*4096);
checkAndEraseData(actual, 0, expected, "Pread Test 7");
actual = new byte[8*4096];
stm.readFully(3*blockSize, actual, 0, 8*4096);
checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
// read the tail
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
IOException res = null;
try { // read beyond the end of the file
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
} catch (IOException e) {
// should throw an exception
res = e;
}
assertTrue("Error reading beyond file boundary.", res != null);
stm.close();
}
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
for (int idx = 0; idx < actual.length; idx++) {
assertEquals(message+" byte "+(from+idx)+" differs. expected "+
expected[from+idx]+" actual "+actual[idx],
actual[idx], expected[from+idx]);
actual[idx] = 0;
}
}
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
int offset, int length) throws IOException {
int nread = 0;
// long totalRead = 0;
// DFSInputStream dfstm = null;
/* Disable. This counts do not add up. Some issue in original hdfs tests?
if (stm.getWrappedStream() instanceof DFSInputStream) {
dfstm = (DFSInputStream) (stm.getWrappedStream());
totalRead = dfstm.getReadStatistics().getTotalBytesRead();
} */
while (nread < length) {
int nbytes =
stm.read(position + nread, buffer, offset + nread, length - nread);
assertTrue("Error in pread", nbytes > 0);
nread += nbytes;
}
/* Disable. This counts do not add up. Some issue in original hdfs tests?
if (dfstm != null) {
if (isHedgedRead) {
assertTrue("Expected read statistic to be incremented",
length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
} else {
assertEquals("Expected read statistic to be incremented", length, dfstm
.getReadStatistics().getTotalBytesRead() - totalRead);
}
}*/
}
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
assertTrue(fileSys.delete(name, true));
assertTrue(!fileSys.exists(name));
}
}