From b8389e4c73b70e454525b8adcd5b0a52ae5d1db1 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 3 Jul 2012 20:45:21 +0000 Subject: [PATCH] HDFS-3343. Improve metrics for DN read latency. Contributed by Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356928 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/net/SocketOutputStream.java | 43 ++++++++++++++++--- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/BlockSender.java | 12 ++++-- .../datanode/metrics/DataNodeMetrics.java | 11 +++++ .../server/datanode/TestDataNodeMetrics.java | 42 +++++++++++++++--- 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java index 953808ec177..917d5f43f18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java @@ -31,6 +31,8 @@ import java.nio.channels.WritableByteChannel; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.util.Progressable; /** * This implements an output stream that can have a timeout while writing. @@ -166,7 +168,10 @@ public class SocketOutputStream extends OutputStream /** * Transfers data from FileChannel using - * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. + * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. + * Updates waitForWritableTime and transferToTime + * with the time spent blocked on the network and the time spent transferring + * data from disk to network respectively. * * Similar to readFully(), this waits till requested amount of * data is transfered. @@ -174,6 +179,9 @@ public class SocketOutputStream extends OutputStream * @param fileCh FileChannel to transfer data from. * @param position position within the channel where the transfer begins * @param count number of bytes to transfer. + * @param waitForWritableTime updated by the nanoseconds spent waiting for + * the socket to become writable + * @param transferTime updated by the nanoseconds spent transferring data * * @throws EOFException * If end of input file is reached before requested number of @@ -186,9 +194,11 @@ public class SocketOutputStream extends OutputStream * @throws IOException Includes any exception thrown by * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. */ - public void transferToFully(FileChannel fileCh, long position, int count) - throws IOException { - + public void transferToFully(FileChannel fileCh, long position, int count, + MutableRate waitForWritableTime, + MutableRate transferToTime) throws IOException { + long waitTime = 0; + long transferTime = 0; while (count > 0) { /* * Ideally we should wait after transferTo returns 0. But because of @@ -200,7 +210,10 @@ public class SocketOutputStream extends OutputStream * * Once we move to JAVA SE 7, wait should be moved to correct place. */ + long start = System.nanoTime(); waitForWritable(); + long wait = System.nanoTime(); + int nTransfered = (int) fileCh.transferTo(position, count, getChannel()); if (nTransfered == 0) { @@ -219,6 +232,26 @@ public class SocketOutputStream extends OutputStream position += nTransfered; count -= nTransfered; } + long transfer = System.nanoTime(); + waitTime += wait - start; + transferTime += transfer - wait; } - } + + if (waitForWritableTime != null) { + waitForWritableTime.add(waitTime); + } + if (transferToTime != null) { + transferToTime.add(transferTime); + } + } + + /** + * Call + * {@link #transferToFully(FileChannel, long, int, MutableRate, MutableRate)} + * with null waitForWritableTime and transferToTime + */ + public void transferToFully(FileChannel fileCh, long position, int count) + throws IOException { + transferToFully(fileCh, position, count, null, null); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 321b137fcd8..f436f83b811 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -266,6 +266,8 @@ Branch-2 ( Unreleased changes ) HDFS-3475. Make the replication monitor multipliers configurable. (harsh via eli) + HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 12ee56ece7e..62b97dca237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ReadaheadPool; @@ -142,6 +144,7 @@ class BlockSender implements java.io.Closeable { /** Format used to print client trace log messages */ private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; + private DataNode datanode; /** The file descriptor of the block being sent */ private FileDescriptor blockInFd; @@ -184,6 +187,7 @@ class BlockSender implements java.io.Closeable { this.clientTraceFmt = clientTraceFmt; this.readaheadLength = datanode.getDnConf().readaheadLength; this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; + this.datanode = datanode; final Replica replica; final long replicaVisibleLength; @@ -478,9 +482,11 @@ class BlockSender implements java.io.Closeable { SocketOutputStream sockOut = (SocketOutputStream)out; sockOut.write(buf, 0, dataOff); // First write checksum - // no need to flush. since we know out is not a buffered stream. - sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), - blockInPosition, dataLen); + // no need to flush since we know out is not a buffered stream + FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); + sockOut.transferToFully(fileCh, blockInPosition, dataLen, + datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(), + datanode.metrics.getSendDataPacketTransferNanos()); blockInPosition += dataLen; } else { // normal transfer diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index a849cdad2cf..4623679ce89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -75,6 +75,9 @@ public class DataNodeMetrics { @Metric MutableRate blockReports; @Metric MutableRate fsync; + + @Metric MutableRate sendDataPacketBlockedOnNetworkNanos; + @Metric MutableRate sendDataPacketTransferNanos; final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; @@ -183,4 +186,12 @@ public class DataNodeMetrics { public void incrBlocksGetLocalPathInfo() { blocksGetLocalPathInfo.incr(); } + + public MutableRate getSendDataPacketBlockedOnNetworkNanos() { + return sendDataPacketBlockedOnNetworkNanos; + } + + public MutableRate getSendDataPacketTransferNanos() { + return sendDataPacketTransferNanos; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index bfff3ff19e3..c18d29b4244 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -17,21 +17,28 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.*; + import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import static org.apache.hadoop.test.MetricsAsserts.*; +import org.junit.Test; -import junit.framework.TestCase; - -public class TestDataNodeMetrics extends TestCase { +public class TestDataNodeMetrics { + MiniDFSCluster cluster = null; + FileSystem fs = null; + + @Test public void testDataNodeMetrics() throws Exception { Configuration conf = new HdfsConfiguration(); SimulatedFSDataset.setFactory(conf); @@ -50,4 +57,29 @@ public class TestDataNodeMetrics extends TestCase { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testSendDataPacket() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + FileSystem fs = cluster.getFileSystem(); + // Create and read a 1 byte file + Path tmpfile = new Path("/tmp.txt"); + DFSTestUtil.createFile(fs, tmpfile, + (long)1, (short)1, 1L); + DFSTestUtil.readFile(fs, tmpfile); + List datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), 1); + DataNode datanode = datanodes.get(0); + MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); + + // Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet + // signaling the end of the block + assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb); + assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb); + } finally { + if (cluster != null) {cluster.shutdown();} + } + } }