HDFS-3170. Add more useful metrics for write latency. Contributed by Matthew Jacobs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1357969 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-05 22:18:16 +00:00
parent ca78b62579
commit 5f6a138ca0
6 changed files with 153 additions and 24 deletions

View File

@ -96,6 +96,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd) HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
HDFS-3170. Add more useful metrics for write latency (Matthew Jacobs via
todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -42,14 +42,25 @@ public class PipelineAck {
} }
/** /**
* Constructor * Constructor assuming no next DN in pipeline
* @param seqno sequence number * @param seqno sequence number
* @param replies an array of replies * @param replies an array of replies
*/ */
public PipelineAck(long seqno, Status[] replies) { public PipelineAck(long seqno, Status[] replies) {
this(seqno, replies, 0L);
}
/**
* Constructor
* @param seqno sequence number
* @param replies an array of replies
* @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
*/
public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
proto = PipelineAckProto.newBuilder() proto = PipelineAckProto.newBuilder()
.setSeqno(seqno) .setSeqno(seqno)
.addAllStatus(Arrays.asList(replies)) .addAllStatus(Arrays.asList(replies))
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build(); .build();
} }
@ -76,7 +87,15 @@ public class PipelineAck {
public Status getReply(int i) { public Status getReply(int i) {
return proto.getStatus(i); return proto.getStatus(i);
} }
/**
* Get the time elapsed for downstream ack RTT in nanoseconds
* @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
*/
public long getDownstreamAckTimeNanos() {
return proto.getDownstreamAckTimeNanos();
}
/** /**
* Check if this ack contains error status * Check if this ack contains error status
* @return true if all statuses are SUCCESS * @return true if all statuses are SUCCESS

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -252,15 +251,21 @@ class BlockReceiver implements Closeable {
if (syncOnClose && (out != null || checksumOut != null)) { if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
} }
long flushTotalNanos = 0;
boolean measuredFlushTime = false;
// close checksum file // close checksum file
try { try {
if (checksumOut != null) { if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush(); checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (cout instanceof FileOutputStream)) { if (syncOnClose && (cout instanceof FileOutputStream)) {
long start = Util.now(); long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true); ((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
} }
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
checksumOut.close(); checksumOut.close();
checksumOut = null; checksumOut = null;
} }
@ -273,12 +278,16 @@ class BlockReceiver implements Closeable {
// close block file // close block file
try { try {
if (out != null) { if (out != null) {
long flushStartNanos = System.nanoTime();
out.flush(); out.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (out instanceof FileOutputStream)) { if (syncOnClose && (out instanceof FileOutputStream)) {
long start = Util.now(); long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true); ((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
} }
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
out.close(); out.close();
out = null; out = null;
} }
@ -288,6 +297,9 @@ class BlockReceiver implements Closeable {
finally{ finally{
IOUtils.closeStream(out); IOUtils.closeStream(out);
} }
if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
// disk check // disk check
if(ioe != null) { if(ioe != null) {
datanode.checkDiskError(ioe); datanode.checkDiskError(ioe);
@ -303,21 +315,31 @@ class BlockReceiver implements Closeable {
if (isSync && (out != null || checksumOut != null)) { if (isSync && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
} }
long flushTotalNanos = 0;
if (checksumOut != null) { if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush(); checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (cout instanceof FileOutputStream)) { if (isSync && (cout instanceof FileOutputStream)) {
long start = Util.now(); long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true); ((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
} }
flushTotalNanos += flushEndNanos - flushStartNanos;
} }
if (out != null) { if (out != null) {
long flushStartNanos = System.nanoTime();
out.flush(); out.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (out instanceof FileOutputStream)) { if (isSync && (out instanceof FileOutputStream)) {
long start = Util.now(); long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true); ((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start); datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
} }
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (checksumOut != null || out != null) {
datanode.metrics.addFlushNanos(flushTotalNanos);
} }
} }
@ -446,7 +468,7 @@ class BlockReceiver implements Closeable {
*/ */
private void readNextPacket() throws IOException { private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read /* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size * full packet with single read and to accept arbitrary size
* for next packet at the same time. * for next packet at the same time.
*/ */
if (buf == null) { if (buf == null) {
@ -715,7 +737,7 @@ class BlockReceiver implements Closeable {
replicaInfo.setLastChecksumAndDataLen( replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum offsetInBlock, lastChunkChecksum
); );
datanode.metrics.incrBytesWritten(len); datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock); dropOsCacheBehindWriter(offsetInBlock);
@ -976,7 +998,8 @@ class BlockReceiver implements Closeable {
synchronized void enqueue(final long seqno, synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) { final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) { if (running) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock); final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime());
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p); LOG.debug(myString + ": enqueue " + p);
} }
@ -1013,17 +1036,20 @@ class BlockReceiver implements Closeable {
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) { while (running && datanode.shouldRun && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false; boolean isInterrupted = false;
try { try {
Packet pkt = null; Packet pkt = null;
long expected = -2; long expected = -2;
PipelineAck ack = new PipelineAck(); PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO; long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try { try {
if (type != PacketResponderType.LAST_IN_PIPELINE if (type != PacketResponderType.LAST_IN_PIPELINE
&& !mirrorError) { && !mirrorError) {
// read an ack from downstream datanode // read an ack from downstream datanode
ack.readFields(downstreamIn); ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack); LOG.debug(myString + " got " + ack);
} }
@ -1049,6 +1075,22 @@ class BlockReceiver implements Closeable {
throw new IOException(myString + "seqno: expected=" throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno); + expected + ", received=" + seqno);
} }
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock; lastPacketInBlock = pkt.lastPacketInBlock;
} }
} }
@ -1116,7 +1158,7 @@ class BlockReceiver implements Closeable {
replies[i+1] = ack.getReply(i); replies[i+1] = ack.getReply(i);
} }
} }
PipelineAck replyAck = new PipelineAck(expected, replies); PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
if (replyAck.isSuccess() && if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked()) pkt.offsetInBlock > replicaInfo.getBytesAcked())
@ -1176,11 +1218,14 @@ class BlockReceiver implements Closeable {
final long seqno; final long seqno;
final boolean lastPacketInBlock; final boolean lastPacketInBlock;
final long offsetInBlock; final long offsetInBlock;
final long ackEnqueueNanoTime;
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) { Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
long ackEnqueueNanoTime) {
this.seqno = seqno; this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock; this.lastPacketInBlock = lastPacketInBlock;
this.offsetInBlock = offsetInBlock; this.offsetInBlock = offsetInBlock;
this.ackEnqueueNanoTime = ackEnqueueNanoTime;
} }
@Override @Override
@ -1188,6 +1233,7 @@ class BlockReceiver implements Closeable {
return getClass().getSimpleName() + "(seqno=" + seqno return getClass().getSimpleName() + "(seqno=" + seqno
+ ", lastPacketInBlock=" + lastPacketInBlock + ", lastPacketInBlock=" + lastPacketInBlock
+ ", offsetInBlock=" + offsetInBlock + ", offsetInBlock=" + offsetInBlock
+ ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
+ ")"; + ")";
} }
} }

View File

@ -73,8 +73,10 @@ public class DataNodeMetrics {
@Metric MutableRate replaceBlockOp; @Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats; @Metric MutableRate heartbeats;
@Metric MutableRate blockReports; @Metric MutableRate blockReports;
@Metric MutableRate packetAckRoundTripTimeNanos;
@Metric MutableRate fsync; @Metric MutableRate flushNanos;
@Metric MutableRate fsyncNanos;
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos; @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
@Metric MutableRate sendDataPacketTransferNanos; @Metric MutableRate sendDataPacketTransferNanos;
@ -162,8 +164,16 @@ public class DataNodeMetrics {
fsyncCount.incr(); fsyncCount.incr();
} }
public void addFsync(long latency) { public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
fsync.add(latency); packetAckRoundTripTimeNanos.add(latencyNanos);
}
public void addFlushNanos(long latencyNanos) {
flushNanos.add(latencyNanos);
}
public void addFsyncNanos(long latencyNanos) {
fsyncNanos.add(latencyNanos);
} }
public void shutdown() { public void shutdown() {

View File

@ -129,6 +129,7 @@ enum Status {
message PipelineAckProto { message PipelineAckProto {
required sint64 seqno = 1; required sint64 seqno = 1;
repeated Status status = 2; repeated Status status = 2;
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
} }
/** /**

View File

@ -18,26 +18,25 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.util.List; import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test; import org.junit.Test;
public class TestDataNodeMetrics { public class TestDataNodeMetrics {
MiniDFSCluster cluster = null;
FileSystem fs = null;
@Test @Test
public void testDataNodeMetrics() throws Exception { public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -82,4 +81,55 @@ public class TestDataNodeMetrics {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
@Test
public void testFlushMetric() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
Path testFile = new Path("/testFlushNanosMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
// Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
// on closing the data and metadata files.
assertCounter("FlushNanosNumOps", 2L, dnMetrics);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testRoundTripAckMetric() throws Exception {
final int DATANODE_COUNT = 2;
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
Path testFile = new Path("/testRoundTripAckMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
new Random().nextLong());
boolean foundNonzeroPacketAckNumOps = false;
for (DataNode datanode : cluster.getDataNodes()) {
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
foundNonzeroPacketAckNumOps = true;
}
}
assertTrue(
"Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
foundNonzeroPacketAckNumOps);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
} }