HDFS-3650. Use MutableQuantiles to provide latency histograms for various operations. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1366245 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-07-27 00:22:54 +00:00
parent 050df1454f
commit 8a8461ef31
14 changed files with 292 additions and 59 deletions

View File

@ -893,6 +893,25 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
}
return Integer.parseInt(valueString);
}
/**
* Get the value of the <code>name</code> property as a set of comma-delimited
* <code>int</code> values.
*
* If no such property exists, an empty array is returned.
*
* @param name property name
* @return property value interpreted as an array of comma-delimited
* <code>int</code> values
*/
public int[] getInts(String name) {
String[] strings = getTrimmedStrings(name);
int[] ints = new int[strings.length];
for (int i = 0; i < strings.length; i++) {
ints[i] = Integer.parseInt(strings[i]);
}
return ints;
}
/**
* Set the value of the <code>name</code> property to an <code>int</code>.

View File

@ -45,7 +45,8 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceStability.Evolving
public class MutableQuantiles extends MutableMetric {
static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
@VisibleForTesting
public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
@ -90,8 +91,7 @@ public class MutableQuantiles extends MutableMetric {
"Number of %s for %s with %ds interval", lsName, desc, interval));
// Construct the MetricsInfos for the quantiles, converting to percentiles
quantileInfos = new MetricsInfo[quantiles.length];
String nameTemplate = ucName + "%dthPercentile" + interval + "sInterval"
+ uvName;
String nameTemplate = ucName + "%dthPercentile" + uvName;
String descTemplate = "%d percentile " + lvName + " with " + interval
+ " second interval for " + desc;
for (int i = 0; i < quantiles.length; i++) {

View File

@ -31,8 +31,8 @@ import java.nio.channels.WritableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.util.Progressable;
/**
* This implements an output stream that can have a timeout while writing.
@ -179,9 +179,9 @@ public class SocketOutputStream extends OutputStream
* @param fileCh FileChannel to transfer data from.
* @param position position within the channel where the transfer begins
* @param count number of bytes to transfer.
* @param waitForWritableTime updated by the nanoseconds spent waiting for
* the socket to become writable
* @param transferTime updated by the nanoseconds spent transferring data
* @param waitForWritableTime nanoseconds spent waiting for the socket
* to become writable
* @param transferTime nanoseconds spent transferring data
*
* @throws EOFException
* If end of input file is reached before requested number of
@ -195,8 +195,8 @@ public class SocketOutputStream extends OutputStream
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
*/
public void transferToFully(FileChannel fileCh, long position, int count,
MutableRate waitForWritableTime,
MutableRate transferToTime) throws IOException {
LongWritable waitForWritableTime,
LongWritable transferToTime) throws IOException {
long waitTime = 0;
long transferTime = 0;
while (count > 0) {
@ -236,12 +236,12 @@ public class SocketOutputStream extends OutputStream
waitTime += wait - start;
transferTime += transfer - wait;
}
if (waitForWritableTime != null) {
waitForWritableTime.add(waitTime);
waitForWritableTime.set(waitTime);
}
if (transferToTime != null) {
transferToTime.add(transferTime);
transferToTime.set(transferTime);
}
}

View File

@ -150,7 +150,7 @@ public class TestMutableMetrics {
info("FooNumOps", "Number of ops for stat with 5s interval"),
(long) 2000);
Quantile[] quants = MutableQuantiles.quantiles;
String name = "Foo%dthPercentile5sIntervalLatency";
String name = "Foo%dthPercentileLatency";
String desc = "%d percentile latency with 5 second interval for stat";
for (Quantile q : quants) {
int percentile = (int) (100 * q.quantile);
@ -176,7 +176,7 @@ public class TestMutableMetrics {
"Latency", 5);
Quantile[] quants = MutableQuantiles.quantiles;
String name = "Foo%dthPercentile5sIntervalLatency";
String name = "Foo%dthPercentileLatency";
String desc = "%d percentile latency with 5 second interval for stat";
// Push values for three intervals

View File

@ -23,7 +23,9 @@ import static com.google.common.base.Preconditions.*;
import org.hamcrest.Description;
import org.junit.Assert;
import static org.mockito.AdditionalMatchers.geq;
import static org.mockito.Mockito.*;
import org.mockito.stubbing.Answer;
import org.mockito.internal.matchers.GreaterThan;
import org.mockito.invocation.InvocationOnMock;
@ -39,7 +41,11 @@ import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.util.Quantile;
import static org.apache.hadoop.metrics2.lib.Interns.*;
import static org.apache.hadoop.test.MetricsAsserts.eqName;
/**
* Helpers for metrics source tests
@ -328,4 +334,23 @@ public class MetricsAsserts {
MetricsSource source) {
assertGaugeGt(name, greater, getMetrics(source));
}
/**
* Asserts that the NumOps and quantiles for a metric have been changed at
* some point to a non-zero value.
*
* @param prefix of the metric
* @param rb MetricsRecordBuilder with the metric
*/
public static void assertQuantileGauges(String prefix,
MetricsRecordBuilder rb) {
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
for (Quantile q : MutableQuantiles.quantiles) {
String nameTemplate = prefix + "%dthPercentileLatency";
int percentile = (int) (100 * q.quantile);
verify(rb).addGauge(
eqName(info(String.format(nameTemplate, percentile), "")),
geq(0l));
}
}
}

View File

@ -174,6 +174,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3711. Manually convert remaining tests to JUnit4. (Andrew Wang via atm)
HDFS-3650. Use MutableQuantiles to provide latency histograms for various
operations. (Andrew Wang via atm)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -203,6 +203,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";

View File

@ -76,6 +76,8 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
@ -1210,7 +1212,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
//
// returns the list of targets, if any, that is being currently used.
//
synchronized DatanodeInfo[] getPipeline() {
@VisibleForTesting
public synchronized DatanodeInfo[] getPipeline() {
if (streamer == null) {
return null;
}
@ -1758,11 +1761,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
}
void setArtificialSlowdown(long period) {
@VisibleForTesting
public void setArtificialSlowdown(long period) {
artificialSlowdown = period;
}
synchronized void setChunksPerPacket(int value) {
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = PacketHeader.PKT_HEADER_LEN +
(checksum.getBytesPerChecksum() +

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
@ -486,11 +487,14 @@ class BlockSender implements java.io.Closeable {
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
datanode.metrics.getSendDataPacketTransferNanos());
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
} else {
} else {
// normal transfer
out.write(buf, 0, dataOff + dataLen);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.source.JvmMetrics;
@ -74,19 +75,54 @@ public class DataNodeMetrics {
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
@Metric MutableRate packetAckRoundTripTimeNanos;
MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
@Metric MutableRate flushNanos;
MutableQuantiles[] flushNanosQuantiles;
@Metric MutableRate fsyncNanos;
MutableQuantiles[] fsyncNanosQuantiles;
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
@Metric MutableRate sendDataPacketTransferNanos;
MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
public DataNodeMetrics(String name, String sessionId) {
public DataNodeMetrics(String name, String sessionId, int[] intervals) {
this.name = name;
registry.tag(SessionId, sessionId);
final int len = intervals.length;
packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
flushNanosQuantiles = new MutableQuantiles[len];
fsyncNanosQuantiles = new MutableQuantiles[len];
sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
packetAckRoundTripTimeNanosQuantiles[i] = registry.newQuantiles(
"packetAckRoundTripTimeNanos" + interval + "s",
"Packet Ack RTT in ns", "ops", "latency", interval);
flushNanosQuantiles[i] = registry.newQuantiles(
"flushNanos" + interval + "s",
"Disk flush latency in ns", "ops", "latency", interval);
fsyncNanosQuantiles[i] = registry.newQuantiles(
"fsyncNanos" + interval + "s", "Disk fsync latency in ns",
"ops", "latency", interval);
sendDataPacketBlockedOnNetworkNanosQuantiles[i] = registry.newQuantiles(
"sendDataPacketBlockedOnNetworkNanos" + interval + "s",
"Time blocked on network while sending a packet in ns",
"ops", "latency", interval);
sendDataPacketTransferNanosQuantiles[i] = registry.newQuantiles(
"sendDataPacketTransferNanos" + interval + "s",
"Time reading from disk and writing to network while sending " +
"a packet in ns", "ops", "latency", interval);
}
}
public static DataNodeMetrics create(Configuration conf, String dnName) {
@ -94,8 +130,15 @@ public class DataNodeMetrics {
MetricsSystem ms = DefaultMetricsSystem.instance();
JvmMetrics.create("DataNode", sessionId, ms);
String name = "DataNodeActivity-"+ (dnName.isEmpty()
? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
return ms.register(name, null, new DataNodeMetrics(name, sessionId));
? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt()
: dnName.replace(':', '-'));
// Percentile measurement is off by default, by watching no intervals
int[] intervals =
conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
return ms.register(name, null, new DataNodeMetrics(name, sessionId,
intervals));
}
public String name() { return name; }
@ -166,14 +209,23 @@ public class DataNodeMetrics {
public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
packetAckRoundTripTimeNanos.add(latencyNanos);
for (MutableQuantiles q : packetAckRoundTripTimeNanosQuantiles) {
q.add(latencyNanos);
}
}
public void addFlushNanos(long latencyNanos) {
flushNanos.add(latencyNanos);
for (MutableQuantiles q : flushNanosQuantiles) {
q.add(latencyNanos);
}
}
public void addFsyncNanos(long latencyNanos) {
fsyncNanos.add(latencyNanos);
for (MutableQuantiles q : fsyncNanosQuantiles) {
q.add(latencyNanos);
}
}
public void shutdown() {
@ -196,12 +248,18 @@ public class DataNodeMetrics {
public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr();
}
public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
return sendDataPacketBlockedOnNetworkNanos;
public void addSendDataPacketBlockedOnNetworkNanos(long latencyNanos) {
sendDataPacketBlockedOnNetworkNanos.add(latencyNanos);
for (MutableQuantiles q : sendDataPacketBlockedOnNetworkNanosQuantiles) {
q.add(latencyNanos);
}
}
public MutableRate getSendDataPacketTransferNanos() {
return sendDataPacketTransferNanos;
public void addSendDataPacketTransferNanos(long latencyNanos) {
sendDataPacketTransferNanos.add(latencyNanos);
for (MutableQuantiles q : sendDataPacketTransferNanosQuantiles) {
q.add(latencyNanos);
}
}
}

View File

@ -17,17 +17,20 @@
*/
package org.apache.hadoop.hdfs.server.namenode.metrics;
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import static org.apache.hadoop.metrics2.impl.MsInfo.*;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.source.JvmMetrics;
@ -57,15 +60,31 @@ public class NameNodeMetrics {
@Metric("Journal transactions") MutableRate transactions;
@Metric("Journal syncs") MutableRate syncs;
MutableQuantiles[] syncsQuantiles;
@Metric("Journal transactions batched in sync")
MutableCounterLong transactionsBatchedInSync;
@Metric("Block report") MutableRate blockReport;
MutableQuantiles[] blockReportQuantiles;
@Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
@Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
NameNodeMetrics(String processName, String sessionId) {
NameNodeMetrics(String processName, String sessionId, int[] intervals) {
registry.tag(ProcessName, processName).tag(SessionId, sessionId);
final int len = intervals.length;
syncsQuantiles = new MutableQuantiles[len];
blockReportQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
syncsQuantiles[i] = registry.newQuantiles(
"syncs" + interval + "s",
"Journal syncs", "ops", "latency", interval);
blockReportQuantiles[i] = registry.newQuantiles(
"blockReport" + interval + "s",
"Block report", "ops", "latency", interval);
}
}
public static NameNodeMetrics create(Configuration conf, NamenodeRole r) {
@ -73,7 +92,11 @@ public class NameNodeMetrics {
String processName = r.toString();
MetricsSystem ms = DefaultMetricsSystem.instance();
JvmMetrics.create(processName, sessionId, ms);
return ms.register(new NameNodeMetrics(processName, sessionId));
// Percentile measurement is off by default, by watching no intervals
int[] intervals =
conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
}
public void shutdown() {
@ -146,6 +169,9 @@ public class NameNodeMetrics {
public void addSync(long elapsed) {
syncs.add(elapsed);
for (MutableQuantiles q : syncsQuantiles) {
q.add(elapsed);
}
}
public void setFsImageLoadTime(long elapsed) {
@ -154,6 +180,9 @@ public class NameNodeMetrics {
public void addBlockReport(long latency) {
blockReport.add(latency);
for (MutableQuantiles q : blockReportQuantiles) {
q.add(latency);
}
}
public void setSafeModeTime(long elapsed) {

View File

@ -1004,4 +1004,14 @@
</description>
</property>
<property>
<name>dfs.metrics.percentiles.intervals</name>
<value></value>
<description>
Comma-delimited set of integers denoting the desired rollover intervals
(in seconds) for percentile latency metrics on the Namenode and Datanode.
By default, percentile latency metrics are disabled.
</description>
</property>
</configuration>

View File

@ -18,21 +18,26 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test;
@ -59,8 +64,10 @@ public class TestDataNodeMetrics {
}
@Test
public void testSendDataPacket() throws Exception {
public void testSendDataPacketMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
final int interval = 1;
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
FileSystem fs = cluster.getFileSystem();
@ -73,64 +80,110 @@ public class TestDataNodeMetrics {
assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
// Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
// signaling the end of the block
assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
// Wait for at least 1 rollover
Thread.sleep((interval + 1) * 1000);
// Check that the sendPacket percentiles rolled to non-zero values
String sec = interval + "s";
assertQuantileGauges("SendDataPacketBlockedOnNetworkNanos" + sec, rb);
assertQuantileGauges("SendDataPacketTransferNanos" + sec, rb);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testFlushMetric() throws Exception {
public void testReceivePacketMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final int interval = 1;
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
Path testFile = new Path("/testFlushNanosMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
FSDataOutputStream fout = fs.create(testFile);
fout.write(new byte[1]);
fout.hsync();
fout.close();
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
// Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
// on closing the data and metadata files.
// Expect two flushes, 1 for the flush that occurs after writing,
// 1 that occurs on closing the data and metadata files.
assertCounter("FlushNanosNumOps", 2L, dnMetrics);
// Expect two syncs, one from the hsync, one on close.
assertCounter("FsyncNanosNumOps", 2L, dnMetrics);
// Wait for at least 1 rollover
Thread.sleep((interval + 1) * 1000);
// Check the receivePacket percentiles that should be non-zero
String sec = interval + "s";
assertQuantileGauges("FlushNanos" + sec, dnMetrics);
assertQuantileGauges("FsyncNanos" + sec, dnMetrics);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
/**
* Tests that round-trip acks in a datanode write pipeline are correctly
* measured.
*/
@Test
public void testRoundTripAckMetric() throws Exception {
final int DATANODE_COUNT = 2;
final int datanodeCount = 2;
final int interval = 1;
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
conf.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
datanodeCount).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
FileSystem fs = cluster.getFileSystem();
// Open a file and get the head of the pipeline
Path testFile = new Path("/testRoundTripAckMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
new Random().nextLong());
boolean foundNonzeroPacketAckNumOps = false;
FSDataOutputStream fsout = fs.create(testFile, (short) datanodeCount);
DFSOutputStream dout = (DFSOutputStream) fsout.getWrappedStream();
// Slow down the writes to catch the write pipeline
dout.setChunksPerPacket(5);
dout.setArtificialSlowdown(3000);
fsout.write(new byte[10000]);
DatanodeInfo[] pipeline = null;
int count = 0;
while (pipeline == null && count < 5) {
pipeline = dout.getPipeline();
System.out.println("Waiting for pipeline to be created.");
Thread.sleep(1000);
count++;
}
// Get the head node that should be receiving downstream acks
DatanodeInfo headInfo = pipeline[0];
DataNode headNode = null;
for (DataNode datanode : cluster.getDataNodes()) {
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
foundNonzeroPacketAckNumOps = true;
if (datanode.getDatanodeId().equals(headInfo)) {
headNode = datanode;
break;
}
}
assertTrue(
"Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
foundNonzeroPacketAckNumOps);
assertNotNull("Could not find the head of the datanode write pipeline",
headNode);
// Close the file and wait for the metrics to rollover
Thread.sleep((interval + 1) * 1000);
// Check the ack was received
MetricsRecordBuilder dnMetrics = getMetrics(headNode.getMetrics()
.name());
assertTrue("Expected non-zero number of acks",
getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0);
assertQuantileGauges("PacketAckRoundTripTimeNanos" + interval
+ "s", dnMetrics);
} finally {
if (cluster != null) {cluster.shutdown();}
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.metrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertTrue;
@ -63,6 +64,9 @@ public class TestNameNodeMetrics {
// Number of datanodes in the cluster
private static final int DATANODE_COUNT = 3;
private static final int WAIT_GAUGE_VALUE_RETRIES = 20;
// Rollover interval of percentile metrics (in seconds)
private static final int PERCENTILES_INTERVAL = 1;
static {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
@ -71,6 +75,8 @@ public class TestNameNodeMetrics {
DFS_REPLICATION_INTERVAL);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
"" + PERCENTILES_INTERVAL);
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
.getLogger().setLevel(Level.DEBUG);
@ -368,4 +374,24 @@ public class TestNameNodeMetrics {
assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
}
/**
* Tests that the sync and block report metrics get updated on cluster
* startup.
*/
@Test
public void testSyncAndBlockReportMetric() throws Exception {
MetricsRecordBuilder rb = getMetrics(NN_METRICS);
// We have one sync when the cluster starts up, just opening the journal
assertCounter("SyncsNumOps", 1L, rb);
// Each datanode reports in when the cluster comes up
assertCounter("BlockReportNumOps", (long)DATANODE_COUNT, rb);
// Sleep for an interval+slop to let the percentiles rollover
Thread.sleep((PERCENTILES_INTERVAL+1)*1000);
// Check that the percentiles were updated
assertQuantileGauges("Syncs1s", rb);
assertQuantileGauges("BlockReport1s", rb);
}
}