Add hedgedReads and hedgedReadWins count metrics
Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
This commit is contained in:
parent
8b167e4c94
commit
71ed703367
|
@ -234,4 +234,9 @@ public interface MetricsRegionServerSource extends BaseSource {
|
|||
String MAJOR_COMPACTED_CELLS_SIZE_DESC =
|
||||
"The total amount of data processed during major compactions, in bytes";
|
||||
|
||||
String HEDGED_READS = "hedgedReads";
|
||||
String HEDGED_READS_DESC = "The number of times we started a hedged read";
|
||||
String HEDGED_READ_WINS = "hedgedReadWins";
|
||||
String HEDGED_READ_WINS_DESC =
|
||||
"The number of times we started a hedged read and a hedged read won";
|
||||
}
|
||||
|
|
|
@ -246,4 +246,14 @@ public interface MetricsRegionServerWrapper {
|
|||
* Get the total amount of data processed during major compactions, in bytes.
|
||||
*/
|
||||
long getMajorCompactedCellsSize();
|
||||
|
||||
/**
|
||||
* @return Count of hedged read operations
|
||||
*/
|
||||
public long getHedgedReadOps();
|
||||
|
||||
/**
|
||||
* @return Count of times a hedged read beat out the primary read.
|
||||
*/
|
||||
public long getHedgedReadWins();
|
||||
}
|
||||
|
|
|
@ -219,6 +219,11 @@ public class MetricsRegionServerSourceImpl
|
|||
rsWrap.getCompactedCellsSize())
|
||||
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
|
||||
rsWrap.getMajorCompactedCellsSize())
|
||||
|
||||
.addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps())
|
||||
.addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
|
||||
rsWrap.getHedgedReadWins())
|
||||
|
||||
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
|
||||
rsWrap.getZookeeperQuorum())
|
||||
.tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName())
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -569,9 +570,9 @@ public class SplitLogManager {
|
|||
* @return whether log is replaying
|
||||
*/
|
||||
public boolean isLogReplaying() {
|
||||
if (server.getCoordinatedStateManager() == null) return false;
|
||||
return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
|
||||
.getSplitLogManagerCoordination().isReplaying();
|
||||
CoordinatedStateManager m = server.getCoordinatedStateManager();
|
||||
if (m == null) return false;
|
||||
return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -33,7 +34,9 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
|
||||
/**
|
||||
|
@ -78,6 +81,11 @@ class MetricsRegionServerWrapperImpl
|
|||
private Runnable runnable;
|
||||
private long period;
|
||||
|
||||
/**
|
||||
* Can be null if not on hdfs.
|
||||
*/
|
||||
private DFSHedgedReadMetrics dfsHedgedReadMetrics;
|
||||
|
||||
public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
|
||||
this.regionServer = regionServer;
|
||||
initBlockCache();
|
||||
|
@ -91,6 +99,11 @@ class MetricsRegionServerWrapperImpl
|
|||
this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
try {
|
||||
this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get hedged metrics", e);
|
||||
}
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
|
||||
}
|
||||
|
@ -513,4 +526,14 @@ class MetricsRegionServerWrapperImpl
|
|||
majorCompactedCellsSize = tempMajorCompactedCellsSize;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadOps() {
|
||||
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadWins() {
|
||||
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
|
|||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -1869,4 +1871,47 @@ public abstract class FSUtils {
|
|||
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
|
||||
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param c
|
||||
* @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
|
||||
throws IOException {
|
||||
if (!isHDFS(c)) return null;
|
||||
// getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
|
||||
// to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
|
||||
// to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
|
||||
final String name = "getHedgedReadMetrics";
|
||||
DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
|
||||
Method m;
|
||||
try {
|
||||
m = dfsclient.getClass().getDeclaredMethod(name);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (SecurityException e) {
|
||||
LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
}
|
||||
m.setAccessible(true);
|
||||
try {
|
||||
return (DFSHedgedReadMetrics)m.invoke(dfsclient);
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
} catch (InvocationTargetException e) {
|
||||
LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
|
||||
e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,4 +241,13 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
|
|||
return 10240000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadOps() {
|
||||
return 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getHedgedReadWins() {
|
||||
return 10;
|
||||
}
|
||||
}
|
|
@ -25,11 +25,14 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -41,6 +44,9 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -334,4 +340,153 @@ public class TestFSUtils {
|
|||
assertEquals(expect, fs.getFileStatus(dst).getModificationTime());
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ugly test that ensures we can get at the hedged read counters in dfsclient.
|
||||
* Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test public void testDFSHedgedReadMetrics() throws Exception {
|
||||
HBaseTestingUtility htu = new HBaseTestingUtility();
|
||||
// Enable hedged reads and set it so the threshold is really low.
|
||||
// Most of this test is taken from HDFS, from TestPread.
|
||||
Configuration conf = htu.getConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
|
||||
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
|
||||
// Set short retry timeouts so this test runs faster
|
||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
|
||||
conf.setBoolean("dfs.datanode.transferTo.allowed", false);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
// Get the metrics. Should be empty.
|
||||
DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
|
||||
assertEquals(0, metrics.getHedgedReadOps());
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
try {
|
||||
Path p = new Path("preadtest.dat");
|
||||
// We need > 1 blocks to test out the hedged reads.
|
||||
DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
|
||||
blockSize, (short) 3, seed);
|
||||
pReadFile(fileSys, p);
|
||||
cleanupFile(fileSys, p);
|
||||
assertTrue(metrics.getHedgedReadOps() > 0);
|
||||
} finally {
|
||||
fileSys.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// Below is taken from TestPread over in HDFS.
|
||||
static final int blockSize = 4096;
|
||||
static final long seed = 0xDEADBEEFL;
|
||||
|
||||
private void pReadFile(FileSystem fileSys, Path name) throws IOException {
|
||||
FSDataInputStream stm = fileSys.open(name);
|
||||
byte[] expected = new byte[12 * blockSize];
|
||||
Random rand = new Random(seed);
|
||||
rand.nextBytes(expected);
|
||||
// do a sanity check. Read first 4K bytes
|
||||
byte[] actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
|
||||
// now do a pread for the first 8K bytes
|
||||
actual = new byte[8192];
|
||||
doPread(stm, 0L, actual, 0, 8192);
|
||||
checkAndEraseData(actual, 0, expected, "Pread Test 1");
|
||||
// Now check to see if the normal read returns 4K-8K byte range
|
||||
actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 4096, expected, "Pread Test 2");
|
||||
// Now see if we can cross a single block boundary successfully
|
||||
// read 4K bytes from blockSize - 2K offset
|
||||
stm.readFully(blockSize - 2048, actual, 0, 4096);
|
||||
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
|
||||
// now see if we can cross two block boundaries successfully
|
||||
// read blockSize + 4K bytes from blockSize - 2K offset
|
||||
actual = new byte[blockSize + 4096];
|
||||
stm.readFully(blockSize - 2048, actual);
|
||||
checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
|
||||
// now see if we can cross two block boundaries that are not cached
|
||||
// read blockSize + 4K bytes from 10*blockSize - 2K offset
|
||||
actual = new byte[blockSize + 4096];
|
||||
stm.readFully(10 * blockSize - 2048, actual);
|
||||
checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
|
||||
// now check that even after all these preads, we can still read
|
||||
// bytes 8K-12K
|
||||
actual = new byte[4096];
|
||||
stm.readFully(actual);
|
||||
checkAndEraseData(actual, 8192, expected, "Pread Test 6");
|
||||
// done
|
||||
stm.close();
|
||||
// check block location caching
|
||||
stm = fileSys.open(name);
|
||||
stm.readFully(1, actual, 0, 4096);
|
||||
stm.readFully(4*blockSize, actual, 0, 4096);
|
||||
stm.readFully(7*blockSize, actual, 0, 4096);
|
||||
actual = new byte[3*4096];
|
||||
stm.readFully(0*blockSize, actual, 0, 3*4096);
|
||||
checkAndEraseData(actual, 0, expected, "Pread Test 7");
|
||||
actual = new byte[8*4096];
|
||||
stm.readFully(3*blockSize, actual, 0, 8*4096);
|
||||
checkAndEraseData(actual, 3*blockSize, expected, "Pread Test 8");
|
||||
// read the tail
|
||||
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize/2);
|
||||
IOException res = null;
|
||||
try { // read beyond the end of the file
|
||||
stm.readFully(11*blockSize+blockSize/2, actual, 0, blockSize);
|
||||
} catch (IOException e) {
|
||||
// should throw an exception
|
||||
res = e;
|
||||
}
|
||||
assertTrue("Error reading beyond file boundary.", res != null);
|
||||
|
||||
stm.close();
|
||||
}
|
||||
|
||||
private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
|
||||
for (int idx = 0; idx < actual.length; idx++) {
|
||||
assertEquals(message+" byte "+(from+idx)+" differs. expected "+
|
||||
expected[from+idx]+" actual "+actual[idx],
|
||||
actual[idx], expected[from+idx]);
|
||||
actual[idx] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private void doPread(FSDataInputStream stm, long position, byte[] buffer,
|
||||
int offset, int length) throws IOException {
|
||||
int nread = 0;
|
||||
// long totalRead = 0;
|
||||
// DFSInputStream dfstm = null;
|
||||
|
||||
/* Disable. This counts do not add up. Some issue in original hdfs tests?
|
||||
if (stm.getWrappedStream() instanceof DFSInputStream) {
|
||||
dfstm = (DFSInputStream) (stm.getWrappedStream());
|
||||
totalRead = dfstm.getReadStatistics().getTotalBytesRead();
|
||||
} */
|
||||
|
||||
while (nread < length) {
|
||||
int nbytes =
|
||||
stm.read(position + nread, buffer, offset + nread, length - nread);
|
||||
assertTrue("Error in pread", nbytes > 0);
|
||||
nread += nbytes;
|
||||
}
|
||||
|
||||
/* Disable. This counts do not add up. Some issue in original hdfs tests?
|
||||
if (dfstm != null) {
|
||||
if (isHedgedRead) {
|
||||
assertTrue("Expected read statistic to be incremented",
|
||||
length <= dfstm.getReadStatistics().getTotalBytesRead() - totalRead);
|
||||
} else {
|
||||
assertEquals("Expected read statistic to be incremented", length, dfstm
|
||||
.getReadStatistics().getTotalBytesRead() - totalRead);
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
||||
assertTrue(fileSys.exists(name));
|
||||
assertTrue(fileSys.delete(name, true));
|
||||
assertTrue(!fileSys.exists(name));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue