HDFS-3170. Add more useful metrics for write latency. Contributed by Matthew Jacobs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1357970 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-05 22:18:30 +00:00
parent 3d5a44f919
commit e0ef844280
6 changed files with 153 additions and 24 deletions

View File

@ -271,6 +271,9 @@ Branch-2 ( Unreleased changes )
HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
HDFS-3170. Add more useful metrics for write latency (Matthew Jacobs via
todd)
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -42,14 +42,25 @@ public class PipelineAck {
}
/**
* Constructor
* Constructor assuming no next DN in pipeline
* @param seqno sequence number
* @param replies an array of replies
*/
public PipelineAck(long seqno, Status[] replies) {
this(seqno, replies, 0L);
}
/**
* Constructor
* @param seqno sequence number
* @param replies an array of replies
* @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
*/
public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
proto = PipelineAckProto.newBuilder()
.setSeqno(seqno)
.addAllStatus(Arrays.asList(replies))
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build();
}
@ -76,7 +87,15 @@ public class PipelineAck {
public Status getReply(int i) {
return proto.getStatus(i);
}
/**
* Get the time elapsed for downstream ack RTT in nanoseconds
* @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
*/
public long getDownstreamAckTimeNanos() {
return proto.getDownstreamAckTimeNanos();
}
/**
* Check if this ack contains error status
* @return true if all statuses are SUCCESS

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -252,15 +251,21 @@ class BlockReceiver implements Closeable {
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0;
boolean measuredFlushTime = false;
// close checksum file
try {
if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (cout instanceof FileOutputStream)) {
long start = Util.now();
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
checksumOut.close();
checksumOut = null;
}
@ -273,12 +278,16 @@ class BlockReceiver implements Closeable {
// close block file
try {
if (out != null) {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (out instanceof FileOutputStream)) {
long start = Util.now();
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
out.close();
out = null;
}
@ -288,6 +297,9 @@ class BlockReceiver implements Closeable {
finally{
IOUtils.closeStream(out);
}
if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
// disk check
if(ioe != null) {
datanode.checkDiskError(ioe);
@ -303,21 +315,31 @@ class BlockReceiver implements Closeable {
if (isSync && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0;
if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (cout instanceof FileOutputStream)) {
long start = Util.now();
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (out != null) {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (out instanceof FileOutputStream)) {
long start = Util.now();
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (checksumOut != null || out != null) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
}
@ -446,7 +468,7 @@ class BlockReceiver implements Closeable {
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* full packet with single read and to accept arbitrary size
* for next packet at the same time.
*/
if (buf == null) {
@ -715,7 +737,7 @@ class BlockReceiver implements Closeable {
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
);
datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock);
@ -976,7 +998,8 @@ class BlockReceiver implements Closeable {
synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime());
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
@ -1013,17 +1036,20 @@ class BlockReceiver implements Closeable {
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE
&& !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
@ -1049,6 +1075,22 @@ class BlockReceiver implements Closeable {
throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
@ -1116,7 +1158,7 @@ class BlockReceiver implements Closeable {
replies[i+1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(expected, replies);
PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked())
@ -1176,11 +1218,14 @@ class BlockReceiver implements Closeable {
final long seqno;
final boolean lastPacketInBlock;
final long offsetInBlock;
final long ackEnqueueNanoTime;
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
long ackEnqueueNanoTime) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
this.offsetInBlock = offsetInBlock;
this.ackEnqueueNanoTime = ackEnqueueNanoTime;
}
@Override
@ -1188,6 +1233,7 @@ class BlockReceiver implements Closeable {
return getClass().getSimpleName() + "(seqno=" + seqno
+ ", lastPacketInBlock=" + lastPacketInBlock
+ ", offsetInBlock=" + offsetInBlock
+ ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
+ ")";
}
}

View File

@ -73,8 +73,10 @@ public class DataNodeMetrics {
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
@Metric MutableRate packetAckRoundTripTimeNanos;
@Metric MutableRate fsync;
@Metric MutableRate flushNanos;
@Metric MutableRate fsyncNanos;
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
@Metric MutableRate sendDataPacketTransferNanos;
@ -162,8 +164,16 @@ public class DataNodeMetrics {
fsyncCount.incr();
}
public void addFsync(long latency) {
fsync.add(latency);
public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
packetAckRoundTripTimeNanos.add(latencyNanos);
}
public void addFlushNanos(long latencyNanos) {
flushNanos.add(latencyNanos);
}
public void addFsyncNanos(long latencyNanos) {
fsyncNanos.add(latencyNanos);
}
public void shutdown() {

View File

@ -129,6 +129,7 @@ enum Status {
message PipelineAckProto {
required sint64 seqno = 1;
repeated Status status = 2;
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
}
/**

View File

@ -18,26 +18,25 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.*;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.Test;
public class TestDataNodeMetrics {
MiniDFSCluster cluster = null;
FileSystem fs = null;
@Test
public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
@ -82,4 +81,55 @@ public class TestDataNodeMetrics {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testFlushMetric() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
Path testFile = new Path("/testFlushNanosMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong());
List<DataNode> datanodes = cluster.getDataNodes();
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
// Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs
// on closing the data and metadata files.
assertCounter("FlushNanosNumOps", 2L, dnMetrics);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
@Test
public void testRoundTripAckMetric() throws Exception {
final int DATANODE_COUNT = 2;
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build();
try {
cluster.waitActive();
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
Path testFile = new Path("/testRoundTripAckMetric.txt");
DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT,
new Random().nextLong());
boolean foundNonzeroPacketAckNumOps = false;
for (DataNode datanode : cluster.getDataNodes()) {
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) {
foundNonzeroPacketAckNumOps = true;
}
}
assertTrue(
"Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric",
foundNonzeroPacketAckNumOps);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
}