HDFS-3343. Improve metrics for DN read latency. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356928 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-03 20:45:21 +00:00
parent 3c2101ae4a
commit b8389e4c73
5 changed files with 97 additions and 13 deletions

View File

@ -31,6 +31,8 @@ import java.nio.channels.WritableByteChannel;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.util.Progressable;
/** /**
* This implements an output stream that can have a timeout while writing. * This implements an output stream that can have a timeout while writing.
@ -167,6 +169,9 @@ public class SocketOutputStream extends OutputStream
/** /**
* Transfers data from FileChannel using * Transfers data from FileChannel using
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}. * {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
* Updates <code>waitForWritableTime</code> and <code>transferToTime</code>
* with the time spent blocked on the network and the time spent transferring
* data from disk to network respectively.
* *
* Similar to readFully(), this waits till requested amount of * Similar to readFully(), this waits till requested amount of
* data is transfered. * data is transfered.
@ -174,6 +179,9 @@ public class SocketOutputStream extends OutputStream
* @param fileCh FileChannel to transfer data from. * @param fileCh FileChannel to transfer data from.
* @param position position within the channel where the transfer begins * @param position position within the channel where the transfer begins
* @param count number of bytes to transfer. * @param count number of bytes to transfer.
* @param waitForWritableTime updated by the nanoseconds spent waiting for
* the socket to become writable
* @param transferTime updated by the nanoseconds spent transferring data
* *
* @throws EOFException * @throws EOFException
* If end of input file is reached before requested number of * If end of input file is reached before requested number of
@ -186,9 +194,11 @@ public class SocketOutputStream extends OutputStream
* @throws IOException Includes any exception thrown by * @throws IOException Includes any exception thrown by
* {@link FileChannel#transferTo(long, long, WritableByteChannel)}. * {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
*/ */
public void transferToFully(FileChannel fileCh, long position, int count) public void transferToFully(FileChannel fileCh, long position, int count,
throws IOException { MutableRate waitForWritableTime,
MutableRate transferToTime) throws IOException {
long waitTime = 0;
long transferTime = 0;
while (count > 0) { while (count > 0) {
/* /*
* Ideally we should wait after transferTo returns 0. But because of * Ideally we should wait after transferTo returns 0. But because of
@ -200,7 +210,10 @@ public class SocketOutputStream extends OutputStream
* *
* Once we move to JAVA SE 7, wait should be moved to correct place. * Once we move to JAVA SE 7, wait should be moved to correct place.
*/ */
long start = System.nanoTime();
waitForWritable(); waitForWritable();
long wait = System.nanoTime();
int nTransfered = (int) fileCh.transferTo(position, count, getChannel()); int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
if (nTransfered == 0) { if (nTransfered == 0) {
@ -219,6 +232,26 @@ public class SocketOutputStream extends OutputStream
position += nTransfered; position += nTransfered;
count -= nTransfered; count -= nTransfered;
} }
long transfer = System.nanoTime();
waitTime += wait - start;
transferTime += transfer - wait;
} }
if (waitForWritableTime != null) {
waitForWritableTime.add(waitTime);
}
if (transferToTime != null) {
transferToTime.add(transferTime);
}
}
/**
* Call
* {@link #transferToFully(FileChannel, long, int, MutableRate, MutableRate)}
* with null <code>waitForWritableTime</code> and <code>transferToTime</code>
*/
public void transferToFully(FileChannel fileCh, long position, int count)
throws IOException {
transferToFully(fileCh, position, count, null, null);
} }
} }

View File

@ -266,6 +266,8 @@ Branch-2 ( Unreleased changes )
HDFS-3475. Make the replication monitor multipliers configurable. HDFS-3475. Make the replication monitor multipliers configurable.
(harsh via eli) (harsh via eli)
HDFS-3343. Improve metrics for DN read latency (Andrew Wang via todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -36,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.ReadaheadPool;
@ -142,6 +144,7 @@ class BlockSender implements java.io.Closeable {
/** Format used to print client trace log messages */ /** Format used to print client trace log messages */
private final String clientTraceFmt; private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null; private volatile ChunkChecksum lastChunkChecksum = null;
private DataNode datanode;
/** The file descriptor of the block being sent */ /** The file descriptor of the block being sent */
private FileDescriptor blockInFd; private FileDescriptor blockInFd;
@ -184,6 +187,7 @@ class BlockSender implements java.io.Closeable {
this.clientTraceFmt = clientTraceFmt; this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getDnConf().readaheadLength; this.readaheadLength = datanode.getDnConf().readaheadLength;
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads; this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
this.datanode = datanode;
final Replica replica; final Replica replica;
final long replicaVisibleLength; final long replicaVisibleLength;
@ -478,9 +482,11 @@ class BlockSender implements java.io.Closeable {
SocketOutputStream sockOut = (SocketOutputStream)out; SocketOutputStream sockOut = (SocketOutputStream)out;
sockOut.write(buf, 0, dataOff); // First write checksum sockOut.write(buf, 0, dataOff); // First write checksum
// no need to flush. since we know out is not a buffered stream. // no need to flush since we know out is not a buffered stream
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
blockInPosition, dataLen); sockOut.transferToFully(fileCh, blockInPosition, dataLen,
datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
datanode.metrics.getSendDataPacketTransferNanos());
blockInPosition += dataLen; blockInPosition += dataLen;
} else { } else {
// normal transfer // normal transfer

View File

@ -76,6 +76,9 @@ public class DataNodeMetrics {
@Metric MutableRate fsync; @Metric MutableRate fsync;
@Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
@Metric MutableRate sendDataPacketTransferNanos;
final MetricsRegistry registry = new MetricsRegistry("datanode"); final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name; final String name;
@ -183,4 +186,12 @@ public class DataNodeMetrics {
public void incrBlocksGetLocalPathInfo() { public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr(); blocksGetLocalPathInfo.incr();
} }
public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
return sendDataPacketBlockedOnNetworkNanos;
}
public MutableRate getSendDataPacketTransferNanos() {
return sendDataPacketTransferNanos;
}
} }

View File

@ -17,21 +17,28 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.*;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import static org.apache.hadoop.test.MetricsAsserts.*; import org.junit.Test;
import junit.framework.TestCase; public class TestDataNodeMetrics {
public class TestDataNodeMetrics extends TestCase { MiniDFSCluster cluster = null;
FileSystem fs = null;
@Test
public void testDataNodeMetrics() throws Exception { public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
@ -50,4 +57,29 @@ public class TestDataNodeMetrics extends TestCase {
if (cluster != null) {cluster.shutdown();} if (cluster != null) {cluster.shutdown();}
} }
} }
@Test
public void testSendDataPacket() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
FileSystem fs = cluster.getFileSystem();
// Create and read a 1 byte file
Path tmpfile = new Path("/tmp.txt");
DFSTestUtil.createFile(fs, tmpfile,
(long)1, (short)1, 1L);
DFSTestUtil.readFile(fs, tmpfile);
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
// Expect 2 packets, 1 for the 1 byte read, 1 for the empty packet
// signaling the end of the block
assertCounter("SendDataPacketTransferNanosNumOps", (long)2, rb);
assertCounter("SendDataPacketBlockedOnNetworkNanosNumOps", (long)2, rb);
} finally {
if (cluster != null) {cluster.shutdown();}
}
}
} }