diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java
index 953808ec177..917d5f43f18 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java
@@ -31,6 +31,8 @@ import java.nio.channels.WritableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.util.Progressable;
/**
* This implements an output stream that can have a timeout while writing.
@@ -166,7 +168,10 @@ public class SocketOutputStream extends OutputStream
/**
* Transfers data from FileChannel using
- * {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
+ * {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
+ * Updates waitForWritableTime
and transferToTime
+ * with the time spent blocked on the network and the time spent transferring
+ * data from disk to network respectively.
*
* Similar to readFully(), this waits till requested amount of
* data is transfered.
@@ -174,6 +179,9 @@ public class SocketOutputStream extends OutputStream
* @param fileCh FileChannel to transfer data from.
* @param position position within the channel where the transfer begins
* @param count number of bytes to transfer.
+ * @param waitForWritableTime updated by the nanoseconds spent waiting for
+ * the socket to become writable
+ * @param transferTime updated by the nanoseconds spent transferring data
*
* @throws EOFException
* If end of input file is reached before requested number of
@@ -186,9 +194,11 @@ public class SocketOutputStream extends OutputStream
* @throws IOException Includes any exception thrown by
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
*/
- public void transferToFully(FileChannel fileCh, long position, int count)
- throws IOException {
-
+ public void transferToFully(FileChannel fileCh, long position, int count,
+ MutableRate waitForWritableTime,
+ MutableRate transferToTime) throws IOException {
+ long waitTime = 0;
+ long transferTime = 0;
while (count > 0) {
/*
* Ideally we should wait after transferTo returns 0. But because of
@@ -200,7 +210,10 @@ public class SocketOutputStream extends OutputStream
*
* Once we move to JAVA SE 7, wait should be moved to correct place.
*/
+ long start = System.nanoTime();
waitForWritable();
+ long wait = System.nanoTime();
+
int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
if (nTransfered == 0) {
@@ -219,6 +232,26 @@ public class SocketOutputStream extends OutputStream
position += nTransfered;
count -= nTransfered;
}
+ long transfer = System.nanoTime();
+ waitTime += wait - start;
+ transferTime += transfer - wait;
}
- }
+
+ if (waitForWritableTime != null) {
+ waitForWritableTime.add(waitTime);
+ }
+ if (transferToTime != null) {
+ transferToTime.add(transferTime);
+ }
+ }
+
+ /**
+ * Call
+ * {@link #transferToFully(FileChannel, long, int, MutableRate, MutableRate)}
+ * with null waitForWritableTime
and transferToTime
+ */
+ public void transferToFully(FileChannel fileCh, long position, int count)
+ throws IOException {
+ transferToFully(fileCh, position, count, null, null);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 321b137fcd8..f436f83b811 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -266,6 +266,8 @@ Branch-2 ( Unreleased changes )
HDFS-3475. Make the replication monitor multipliers configurable.
(harsh via eli)
+ HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
+
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 12ee56ece7e..62b97dca237 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
@@ -142,6 +144,7 @@ class BlockSender implements java.io.Closeable {
/** Format used to print client trace log messages */
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
+ private DataNode datanode;
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
@@ -184,6 +187,7 @@ class BlockSender implements java.io.Closeable {
this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getDnConf().readaheadLength;
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+ this.datanode = datanode;
final Replica replica;
final long replicaVisibleLength;
@@ -478,9 +482,11 @@ class BlockSender implements java.io.Closeable {
SocketOutputStream sockOut = (SocketOutputStream)out;
sockOut.write(buf, 0, dataOff); // First write checksum
- // no need to flush. since we know out is not a buffered stream.
- sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
- blockInPosition, dataLen);
+ // no need to flush since we know out is not a buffered stream
+ FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+ sockOut.transferToFully(fileCh, blockInPosition, dataLen,
+ datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
+ datanode.metrics.getSendDataPacketTransferNanos());
blockInPosition += dataLen;
} else {
// normal transfer
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index a849cdad2cf..4623679ce89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -75,6 +75,9 @@ public class DataNodeMetrics {
@Metric MutableRate blockReports;
@Metric MutableRate fsync;
+
+ @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
+ @Metric MutableRate sendDataPacketTransferNanos;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -183,4 +186,12 @@ public class DataNodeMetrics {
public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr();
}
+
+ public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
+ return sendDataPacketBlockedOnNetworkNanos;
+ }
+
+ public MutableRate getSendDataPacketTransferNanos() {
+ return sendDataPacketTransferNanos;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index bfff3ff19e3..c18d29b4244 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -17,21 +17,28 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.*;
+
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.*;
+import org.junit.Test;
-import junit.framework.TestCase;
-
-public class TestDataNodeMetrics extends TestCase {
+public class TestDataNodeMetrics {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+
+ @Test
public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf);
@@ -50,4 +57,29 @@ public class TestDataNodeMetrics extends TestCase {
if (cluster != null) {cluster.shutdown();}
}
}
+
+ @Test
+ public void testSendDataPacket() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ // Create and read a 1 byte file
+ Path tmpfile = new Path("/tmp.txt");
+ DFSTestUtil.createFile(fs, tmpfile,
+ (long)1, (short)1, 1L);
+ DFSTestUtil.readFile(fs, tmpfile);
+ List datanodes = cluster.getDataNodes();
+ assertEquals(datanodes.size(), 1);
+ DataNode datanode = datanodes.get(0);
+ MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
+
+ // Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
+ // signaling the end of the block
+ assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
+ assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
+ } finally {
+ if (cluster != null) {cluster.shutdown();}
+ }
+ }
}