diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 442dc7de9a3..748c5528058 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw"; + + public static final String HADOOP_TRACE_SAMPLER = "hadoop.htrace.sampler"; + public static final String HADOOP_TRACE_SAMPLER_DEFAULT = "NeverSampler"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java index d9125045eb1..82f099e4e11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java @@ -17,24 +17,30 @@ */ package org.apache.hadoop.tracing; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.htrace.HTraceConfiguration; import org.htrace.SpanReceiver; import org.htrace.Trace; - /** * This class provides functions for reading the names of SpanReceivers from @@ -45,7 +51,7 @@ import org.htrace.Trace; @InterfaceAudience.Private public class SpanReceiverHost implements TraceAdminProtocol { public static final String SPAN_RECEIVERS_CONF_KEY = - "hadoop.trace.spanreceiver.classes"; + "hadoop.htrace.spanreceiver.classes"; private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); private final TreeMap receivers = new TreeMap(); @@ -53,6 +59,9 @@ public class SpanReceiverHost implements TraceAdminProtocol { private boolean closed = false; private long highestId = 1; + private final static String LOCAL_FILE_SPAN_RECEIVER_PATH = + "hadoop.htrace.local-file-span-receiver.path"; + private static enum SingletonHolder { INSTANCE; Object lock = new Object(); @@ -81,9 +90,32 @@ public class SpanReceiverHost implements TraceAdminProtocol { private static List EMPTY = Collections.emptyList(); + private static String getUniqueLocalTraceFileName() { + String tmp = System.getProperty("java.io.tmpdir", "/tmp"); + String nonce = null; + BufferedReader reader = null; + try { + // On Linux we can get a unique local file name by reading the process id + // out of /proc/self/stat. (There isn't any portable way to get the + // process ID from Java.) + reader = new BufferedReader( + new InputStreamReader(new FileInputStream("/proc/self/stat"))); + String line = reader.readLine(); + nonce = line.split(" ")[0]; + } catch (IOException e) { + } finally { + IOUtils.cleanup(LOG, reader); + } + if (nonce == null) { + // If we can't use the process ID, use a random nonce. + nonce = UUID.randomUUID().toString(); + } + return new File(tmp, nonce).getAbsolutePath(); + } + /** * Reads the names of classes specified in the - * "hadoop.trace.spanreceiver.classes" property and instantiates and registers + * "hadoop.htrace.spanreceiver.classes" property and instantiates and registers * them with the Tracer as SpanReceiver's. * * The nullary constructor is called during construction, but if the classes @@ -98,8 +130,17 @@ public class SpanReceiverHost implements TraceAdminProtocol { if (receiverNames == null || receiverNames.length == 0) { return; } + // It's convenient to have each daemon log to a random trace file when + // testing. + if (config.get(LOCAL_FILE_SPAN_RECEIVER_PATH) == null) { + config.set(LOCAL_FILE_SPAN_RECEIVER_PATH, + getUniqueLocalTraceFileName()); + } for (String className : receiverNames) { className = className.trim(); + if (!className.contains(".")) { + className = "org.htrace.impl." + className; + } try { SpanReceiver rcvr = loadInstance(className, EMPTY); Trace.addReceiver(rcvr); @@ -145,7 +186,7 @@ public class SpanReceiverHost implements TraceAdminProtocol { extraMap.put(pair.getKey(), pair.getValue()); } return new HTraceConfiguration() { - public static final String HTRACE_CONF_PREFIX = "hadoop."; + public static final String HTRACE_CONF_PREFIX = "hadoop.htrace."; @Override public String get(String key) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java new file mode 100644 index 00000000000..0de7d3e6df9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.htrace.Sampler; +import org.htrace.impl.ProbabilitySampler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class TraceSamplerFactory { + private static final Logger LOG = + LoggerFactory.getLogger(TraceSamplerFactory.class); + + public static Sampler createSampler(Configuration conf) { + String samplerStr = conf.get(CommonConfigurationKeys.HADOOP_TRACE_SAMPLER, + CommonConfigurationKeys.HADOOP_TRACE_SAMPLER_DEFAULT); + if (samplerStr.equals("NeverSampler")) { + LOG.debug("HTrace is OFF for all spans."); + return Sampler.NEVER; + } else if (samplerStr.equals("AlwaysSampler")) { + LOG.info("HTrace is ON for all spans."); + return Sampler.ALWAYS; + } else if (samplerStr.equals("ProbabilitySampler")) { + double percentage = + conf.getDouble("htrace.probability.sampler.percentage", 0.01d); + LOG.info("HTrace is ON for " + percentage + "% of top-level spans."); + return new ProbabilitySampler(percentage / 100.0d); + } else { + throw new RuntimeException("Can't create sampler " + samplerStr + + ". Available samplers are NeverSampler, AlwaysSampler, " + + "and ProbabilitySampler."); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c9e2bd08e92..6308779ea1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -365,6 +365,7 @@ Release 2.7.0 - UNRELEASED NEW FEATURES IMPROVEMENTS + HDFS-7055. Add tracing to DFSInputStream (cmccabe) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index cd75e53b273..3954755952c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -36,6 +36,9 @@ import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * BlockReaderLocal enables local short circuited reads. If the DFS client is on @@ -304,48 +307,54 @@ class BlockReaderLocal implements BlockReader { */ private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) throws IOException { - int total = 0; - long startDataPos = dataPos; - int startBufPos = buf.position(); - while (buf.hasRemaining()) { - int nRead = dataIn.read(buf, dataPos); - if (nRead < 0) { - break; - } - dataPos += nRead; - total += nRead; - } - if (canSkipChecksum) { - freeChecksumBufIfExists(); - return total; - } - if (total > 0) { - try { - buf.limit(buf.position()); - buf.position(startBufPos); - createChecksumBufIfNeeded(); - int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuf.clear(); - checksumBuf.limit(checksumsNeeded * checksumSize); - long checksumPos = - 7 + ((startDataPos / bytesPerChecksum) * checksumSize); - while (checksumBuf.hasRemaining()) { - int nRead = checksumIn.read(checksumBuf, checksumPos); - if (nRead < 0) { - throw new IOException("Got unexpected checksum file EOF at " + - checksumPos + ", block file position " + startDataPos + " for " + - "block " + block + " of file " + filename); - } - checksumPos += nRead; + TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" + + block.getBlockId() + ")", Sampler.NEVER); + try { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; } - checksumBuf.flip(); - - checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); - } finally { - buf.position(buf.limit()); + dataPos += nRead; + total += nRead; } + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = + 7 + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + " for " + + "block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } finally { + scope.close(); } - return total; } private boolean createNoChecksumContext() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 47455754d72..d42b860a0aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -46,6 +46,9 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on @@ -169,6 +172,7 @@ class BlockReaderLocalLegacy implements BlockReader { /** offset in block where reader wants to actually read */ private long startOffset; private final String filename; + private long blockId; /** * The only way this object can be instantiated. @@ -320,6 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader { this.checksum = checksum; this.verifyChecksum = verifyChecksum; this.startOffset = Math.max(startOffset, 0); + this.blockId = block.getBlockId(); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); @@ -357,20 +362,26 @@ class BlockReaderLocalLegacy implements BlockReader { */ private int fillBuffer(FileInputStream stream, ByteBuffer buf) throws IOException { - int bytesRead = stream.getChannel().read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; - } - while (buf.remaining() > 0) { - int n = stream.getChannel().read(buf); - if (n < 0) { + TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" + + blockId + ")", Sampler.NEVER); + try { + int bytesRead = stream.getChannel().read(buf); + if (bytesRead < 0) { //EOF return bytesRead; } - bytesRead += n; + while (buf.remaining() > 0) { + int n = stream.getChannel().read(buf); + if (n < 0) { + //EOF + return bytesRead; + } + bytesRead += n; + } + return bytesRead; + } finally { + scope.close(); } - return bytesRead; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c975ad5a9f9..86faf1853d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -72,12 +72,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.URI; import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; @@ -200,6 +202,7 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -207,6 +210,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.tracing.SpanReceiverHost; +import org.apache.hadoop.tracing.TraceSamplerFactory; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; @@ -218,6 +223,11 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; +import org.htrace.impl.ProbabilitySampler; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -266,6 +276,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; @VisibleForTesting KeyProvider provider; + private final SpanReceiverHost spanReceiverHost; + private final Sampler traceSampler; + /** * DFSClient configuration */ @@ -582,6 +595,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { + spanReceiverHost = SpanReceiverHost.getInstance(conf); + traceSampler = TraceSamplerFactory.createSampler(conf); // Copy only the required DFSClient configuration this.dfsClientConf = new Conf(conf); if (this.dfsClientConf.useLegacyBlockReaderLocal) { @@ -3158,4 +3173,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public SaslDataTransferClient getSaslDataTransferClient() { return saslClient; } + + private static final byte[] PATH = + new String("path").getBytes(Charset.forName("UTF-8")); + + TraceScope getPathTraceScope(String description, String path) { + TraceScope scope = Trace.startSpan(description, traceSampler); + Span span = scope.getSpan(); + if (span != null) { + if (path != null) { + span.addKVAnnotation(PATH, + path.getBytes(Charset.forName("UTF-8"))); + } + } + return scope; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index af1ba14714a..e8bcfccfc14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -74,6 +74,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; import com.google.common.annotations.VisibleForTesting; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; /**************************************************************** * DFSInputStream provides bytes from a named file. It handles @@ -840,15 +843,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public synchronized int read(final byte buf[], int off, int len) throws IOException { ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); - - return readWithStrategy(byteArrayReader, off, len); + TraceScope scope = + dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src); + try { + return readWithStrategy(byteArrayReader, off, len); + } finally { + scope.close(); + } } @Override public synchronized int read(final ByteBuffer buf) throws IOException { ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); - - return readWithStrategy(byteBufferReader, 0, buf.remaining()); + TraceScope scope = + dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src); + try { + return readWithStrategy(byteBufferReader, 0, buf.remaining()); + } finally { + scope.close(); + } } @@ -984,15 +997,23 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, private Callable getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final ByteBuffer bb, - final Map> corruptedBlockMap) { + final Map> corruptedBlockMap, + final int hedgedReadId) { + final Span parentSpan = Trace.currentSpan(); return new Callable() { @Override public ByteBuffer call() throws Exception { byte[] buf = bb.array(); int offset = bb.position(); - actualGetFromOneDataNode(datanode, block, start, end, buf, offset, - corruptedBlockMap); - return bb; + TraceScope scope = + Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); + try { + actualGetFromOneDataNode(datanode, block, start, end, buf, offset, + corruptedBlockMap); + return bb; + } finally { + scope.close(); + } } }; } @@ -1108,6 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ArrayList ignored = new ArrayList(); ByteBuffer bb = null; int len = (int) (end - start + 1); + int hedgedReadId = 0; block = getBlockAt(block.getStartOffset(), false); while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops @@ -1120,7 +1142,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, chosenNode = chooseDataNode(block, ignored); bb = ByteBuffer.wrap(buf, offset, len); Callable getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block, start, end, bb, corruptedBlockMap); + chosenNode, block, start, end, bb, corruptedBlockMap, + hedgedReadId++); Future firstRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(firstRequest); @@ -1157,7 +1180,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( - chosenNode, block, start, end, bb, corruptedBlockMap); + chosenNode, block, start, end, bb, corruptedBlockMap, + hedgedReadId++); Future oneMoreRequest = hedgedService .submit(getFromDataNodeCallable); futures.add(oneMoreRequest); @@ -1272,7 +1296,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, */ @Override public int read(long position, byte[] buffer, int offset, int length) - throws IOException { + throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src); + try { + return pread(position, buffer, offset, length); + } finally { + scope.close(); + } + } + + private int pread(long position, byte[] buffer, int offset, int length) + throws IOException { // sanity checks dfsClient.checkOpen(); if (closed) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 25a72870c7d..f2d3395dc8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -46,6 +46,9 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** @@ -69,6 +72,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { /** offset in block where reader wants to actually read */ private long startOffset; + private final long blockId; + /** offset in block of of first chunk - may be less than startOffset if startOffset is not chunk-aligned */ private final long firstChunkOffset; @@ -208,6 +213,19 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { protected synchronized int readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf) throws IOException { + TraceScope scope = + Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")", + Sampler.NEVER); + try { + return readChunkImpl(pos, buf, offset, len, checksumBuf); + } finally { + scope.close(); + } + } + + private synchronized int readChunkImpl(long pos, byte[] buf, int offset, + int len, byte[] checksumBuf) + throws IOException { // Read one chunk. if (eos) { // Already hit EOF @@ -347,6 +365,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { this.in = in; this.checksum = checksum; this.startOffset = Math.max( startOffset, 0 ); + this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at @@ -367,7 +386,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * - * @param sock An established Socket to the DN. The BlockReader will not close it normally * @param file File location * @param block The block object * @param blockToken The block token for security diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 2361f0a1a62..bc0db568bf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -53,6 +53,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * This is a wrapper around connection to datanode @@ -88,6 +91,7 @@ public class RemoteBlockReader2 implements BlockReader { final private Peer peer; final private DatanodeID datanodeID; final private PeerCache peerCache; + final private long blockId; private final ReadableByteChannel in; private DataChecksum checksum; @@ -143,7 +147,13 @@ public class RemoteBlockReader2 implements BlockReader { } if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } } if (LOG.isTraceEnabled()) { @@ -165,7 +175,13 @@ public class RemoteBlockReader2 implements BlockReader { @Override public int read(ByteBuffer buf) throws IOException { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - readNextPacket(); + TraceScope scope = Trace.startSpan( + "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + try { + readNextPacket(); + } finally { + scope.close(); + } } if (curDataSlice.remaining() == 0) { // we're at EOF now @@ -289,6 +305,7 @@ public class RemoteBlockReader2 implements BlockReader { this.startOffset = Math.max( startOffset, 0 ); this.filename = file; this.peerCache = peerCache; + this.blockId = blockId; // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at @@ -372,8 +389,6 @@ public class RemoteBlockReader2 implements BlockReader { * Create a new BlockReader specifically to satisfy a read. * This method also sends the OP_READ_BLOCK request. * - * @param sock An established Socket to the DN. The BlockReader will not close it normally. - * This socket must have an associated Channel. * @param file File location * @param block The block object * @param blockToken The block token for security diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index febf2de0c3a..0082fcd2a6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -47,6 +47,9 @@ import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; /** * Reads a block from the disk and sends it to a recipient. @@ -668,6 +671,17 @@ class BlockSender implements java.io.Closeable { */ long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException { + TraceScope scope = + Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER); + try { + return doSendBlock(out, baseStream, throttler); + } finally { + scope.close(); + } + } + + private long doSendBlock(DataOutputStream out, OutputStream baseStream, + DataTransferThrottler throttler) throws IOException { if (out == null) { throw new IOException( "out stream is null" ); }