From e0ef844280b98dc699ed3f9d948b83828bb8d297 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 5 Jul 2012 22:18:30 +0000 Subject: [PATCH] HDFS-3170. Add more useful metrics for write latency. Contributed by Matthew Jacobs. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1357970 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../protocol/datatransfer/PipelineAck.java | 23 +++++- .../hdfs/server/datanode/BlockReceiver.java | 74 +++++++++++++++---- .../datanode/metrics/DataNodeMetrics.java | 16 +++- .../src/main/proto/datatransfer.proto | 1 + .../server/datanode/TestDataNodeMetrics.java | 60 +++++++++++++-- 6 files changed, 153 insertions(+), 24 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 10752512d3b..5422f5132fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -271,6 +271,9 @@ Branch-2 ( Unreleased changes ) HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd) + HDFS-3170. Add more useful metrics for write latency (Matthew Jacobs via + todd) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 6498a5619b6..c6ac057aa7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -42,14 +42,25 @@ public class PipelineAck { } /** - * Constructor + * Constructor assuming no next DN in pipeline * @param seqno sequence number * @param replies an array of replies */ public PipelineAck(long seqno, Status[] replies) { + this(seqno, replies, 0L); + } + + /** + * Constructor + * @param seqno sequence number + * @param replies an array of replies + * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline + */ + public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) { proto = PipelineAckProto.newBuilder() .setSeqno(seqno) .addAllStatus(Arrays.asList(replies)) + .setDownstreamAckTimeNanos(downstreamAckTimeNanos) .build(); } @@ -76,7 +87,15 @@ public class PipelineAck { public Status getReply(int i) { return proto.getStatus(i); } - + + /** + * Get the time elapsed for downstream ack RTT in nanoseconds + * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline + */ + public long getDownstreamAckTimeNanos() { + return proto.getDownstreamAckTimeNanos(); + } + /** * Check if this ack contains error status * @return true if all statuses are SUCCESS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index f0f7c780a71..d379927be35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -252,15 +251,21 @@ class BlockReceiver implements Closeable { if (syncOnClose && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } + long flushTotalNanos = 0; + boolean measuredFlushTime = false; // close checksum file try { if (checksumOut != null) { + long flushStartNanos = System.nanoTime(); checksumOut.flush(); + long flushEndNanos = System.nanoTime(); if (syncOnClose && (cout instanceof FileOutputStream)) { - long start = Util.now(); + long fsyncStartNanos = flushEndNanos; ((FileOutputStream)cout).getChannel().force(true); - datanode.metrics.addFsync(Util.now() - start); + datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } + flushTotalNanos += flushEndNanos - flushStartNanos; + measuredFlushTime = true; checksumOut.close(); checksumOut = null; } @@ -273,12 +278,16 @@ class BlockReceiver implements Closeable { // close block file try { if (out != null) { + long flushStartNanos = System.nanoTime(); out.flush(); + long flushEndNanos = System.nanoTime(); if (syncOnClose && (out instanceof FileOutputStream)) { - long start = Util.now(); + long fsyncStartNanos = flushEndNanos; ((FileOutputStream)out).getChannel().force(true); - datanode.metrics.addFsync(Util.now() - start); + datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } + flushTotalNanos += flushEndNanos - flushStartNanos; + measuredFlushTime = true; out.close(); out = null; } @@ -288,6 +297,9 @@ class BlockReceiver implements Closeable { finally{ IOUtils.closeStream(out); } + if (measuredFlushTime) { + datanode.metrics.addFlushNanos(flushTotalNanos); + } // disk check if(ioe != null) { datanode.checkDiskError(ioe); @@ -303,21 +315,31 @@ class BlockReceiver implements Closeable { if (isSync && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } + long flushTotalNanos = 0; if (checksumOut != null) { + long flushStartNanos = System.nanoTime(); checksumOut.flush(); + long flushEndNanos = System.nanoTime(); if (isSync && (cout instanceof FileOutputStream)) { - long start = Util.now(); + long fsyncStartNanos = flushEndNanos; ((FileOutputStream)cout).getChannel().force(true); - datanode.metrics.addFsync(Util.now() - start); + datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } + flushTotalNanos += flushEndNanos - flushStartNanos; } if (out != null) { + long flushStartNanos = System.nanoTime(); out.flush(); + long flushEndNanos = System.nanoTime(); if (isSync && (out instanceof FileOutputStream)) { - long start = Util.now(); + long fsyncStartNanos = flushEndNanos; ((FileOutputStream)out).getChannel().force(true); - datanode.metrics.addFsync(Util.now() - start); + datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos); } + flushTotalNanos += flushEndNanos - flushStartNanos; + } + if (checksumOut != null || out != null) { + datanode.metrics.addFlushNanos(flushTotalNanos); } } @@ -446,7 +468,7 @@ class BlockReceiver implements Closeable { */ private void readNextPacket() throws IOException { /* This dances around buf a little bit, mainly to read - * full packet with single read and to accept arbitarary size + * full packet with single read and to accept arbitrary size * for next packet at the same time. */ if (buf == null) { @@ -715,7 +737,7 @@ class BlockReceiver implements Closeable { replicaInfo.setLastChecksumAndDataLen( offsetInBlock, lastChunkChecksum ); - + datanode.metrics.incrBytesWritten(len); dropOsCacheBehindWriter(offsetInBlock); @@ -976,7 +998,8 @@ class BlockReceiver implements Closeable { synchronized void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock) { if (running) { - final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock); + final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, + System.nanoTime()); if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p); } @@ -1013,17 +1036,20 @@ class BlockReceiver implements Closeable { final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (running && datanode.shouldRun && !lastPacketInBlock) { + long totalAckTimeNanos = 0; boolean isInterrupted = false; try { Packet pkt = null; long expected = -2; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; + long ackRecvNanoTime = 0; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { // read an ack from downstream datanode ack.readFields(downstreamIn); + ackRecvNanoTime = System.nanoTime(); if (LOG.isDebugEnabled()) { LOG.debug(myString + " got " + ack); } @@ -1049,6 +1075,22 @@ class BlockReceiver implements Closeable { throw new IOException(myString + "seqno: expected=" + expected + ", received=" + seqno); } + if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { + // The total ack time includes the ack times of downstream nodes. + // The value is 0 if this responder doesn't have a downstream + // DN in the pipeline. + totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; + // Report the elapsed time from ack send to ack receive minus + // the downstream ack time. + long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); + if (ackTimeNanos < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns."); + } + } else { + datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos); + } + } lastPacketInBlock = pkt.lastPacketInBlock; } } @@ -1116,7 +1158,7 @@ class BlockReceiver implements Closeable { replies[i+1] = ack.getReply(i); } } - PipelineAck replyAck = new PipelineAck(expected, replies); + PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos); if (replyAck.isSuccess() && pkt.offsetInBlock > replicaInfo.getBytesAcked()) @@ -1176,11 +1218,14 @@ class BlockReceiver implements Closeable { final long seqno; final boolean lastPacketInBlock; final long offsetInBlock; + final long ackEnqueueNanoTime; - Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) { + Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock, + long ackEnqueueNanoTime) { this.seqno = seqno; this.lastPacketInBlock = lastPacketInBlock; this.offsetInBlock = offsetInBlock; + this.ackEnqueueNanoTime = ackEnqueueNanoTime; } @Override @@ -1188,6 +1233,7 @@ class BlockReceiver implements Closeable { return getClass().getSimpleName() + "(seqno=" + seqno + ", lastPacketInBlock=" + lastPacketInBlock + ", offsetInBlock=" + offsetInBlock + + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime + ")"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 4623679ce89..70b49d2bb1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -73,8 +73,10 @@ public class DataNodeMetrics { @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; @Metric MutableRate blockReports; + @Metric MutableRate packetAckRoundTripTimeNanos; - @Metric MutableRate fsync; + @Metric MutableRate flushNanos; + @Metric MutableRate fsyncNanos; @Metric MutableRate sendDataPacketBlockedOnNetworkNanos; @Metric MutableRate sendDataPacketTransferNanos; @@ -162,8 +164,16 @@ public class DataNodeMetrics { fsyncCount.incr(); } - public void addFsync(long latency) { - fsync.add(latency); + public void addPacketAckRoundTripTimeNanos(long latencyNanos) { + packetAckRoundTripTimeNanos.add(latencyNanos); + } + + public void addFlushNanos(long latencyNanos) { + flushNanos.add(latencyNanos); + } + + public void addFsyncNanos(long latencyNanos) { + fsyncNanos.add(latencyNanos); } public void shutdown() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index d64f78051ee..ef67cbd10d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -129,6 +129,7 @@ enum Status { message PipelineAckProto { required sint64 seqno = 1; repeated Status status = 2; + optional uint64 downstreamAckTimeNanos = 3 [default = 0]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index c18d29b4244..35e83fa1b24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -18,26 +18,25 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.*; import java.util.List; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.junit.Test; public class TestDataNodeMetrics { - - MiniDFSCluster cluster = null; - FileSystem fs = null; - + @Test public void testDataNodeMetrics() throws Exception { Configuration conf = new HdfsConfiguration(); @@ -82,4 +81,55 @@ public class TestDataNodeMetrics { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testFlushMetric() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem(); + + Path testFile = new Path("/testFlushNanosMetric.txt"); + DFSTestUtil.createFile(fs, testFile, 1, (short)1, new Random().nextLong()); + + List datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name()); + // Expect 2 flushes, 1 for the flush that occurs after writing, 1 that occurs + // on closing the data and metadata files. + assertCounter("FlushNanosNumOps", 2L, dnMetrics); + } finally { + if (cluster != null) {cluster.shutdown();} + } + } + + @Test + public void testRoundTripAckMetric() throws Exception { + final int DATANODE_COUNT = 2; + + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_COUNT).build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem(); + + Path testFile = new Path("/testRoundTripAckMetric.txt"); + DFSTestUtil.createFile(fs, testFile, 1, (short)DATANODE_COUNT, + new Random().nextLong()); + + boolean foundNonzeroPacketAckNumOps = false; + for (DataNode datanode : cluster.getDataNodes()) { + MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name()); + if (getLongCounter("PacketAckRoundTripTimeNanosNumOps", dnMetrics) > 0) { + foundNonzeroPacketAckNumOps = true; + } + } + assertTrue( + "Expected at least one datanode to have reported PacketAckRoundTripTimeNanos metric", + foundNonzeroPacketAckNumOps); + } finally { + if (cluster != null) {cluster.shutdown();} + } + } }