HDFS-7055. Add tracing to DFSInputStream (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2014-10-03 13:28:24 -07:00
parent 34cdcaad71
commit 7f6ed7fe36
11 changed files with 299 additions and 69 deletions

View File

@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
public static final String HADOOP_TRACE_SAMPLER = "hadoop.htrace.sampler";
public static final String HADOOP_TRACE_SAMPLER_DEFAULT = "NeverSampler";
} }

View File

@ -17,17 +17,24 @@
*/ */
package org.apache.hadoop.tracing; package org.apache.hadoop.tracing;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair; import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
@ -35,7 +42,6 @@ import org.htrace.HTraceConfiguration;
import org.htrace.SpanReceiver; import org.htrace.SpanReceiver;
import org.htrace.Trace; import org.htrace.Trace;
/** /**
* This class provides functions for reading the names of SpanReceivers from * This class provides functions for reading the names of SpanReceivers from
* the Hadoop configuration, adding those SpanReceivers to the Tracer, * the Hadoop configuration, adding those SpanReceivers to the Tracer,
@ -45,7 +51,7 @@ import org.htrace.Trace;
@InterfaceAudience.Private @InterfaceAudience.Private
public class SpanReceiverHost implements TraceAdminProtocol { public class SpanReceiverHost implements TraceAdminProtocol {
public static final String SPAN_RECEIVERS_CONF_KEY = public static final String SPAN_RECEIVERS_CONF_KEY =
"hadoop.trace.spanreceiver.classes"; "hadoop.htrace.spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private final TreeMap<Long, SpanReceiver> receivers = private final TreeMap<Long, SpanReceiver> receivers =
new TreeMap<Long, SpanReceiver>(); new TreeMap<Long, SpanReceiver>();
@ -53,6 +59,9 @@ public class SpanReceiverHost implements TraceAdminProtocol {
private boolean closed = false; private boolean closed = false;
private long highestId = 1; private long highestId = 1;
private final static String LOCAL_FILE_SPAN_RECEIVER_PATH =
"hadoop.htrace.local-file-span-receiver.path";
private static enum SingletonHolder { private static enum SingletonHolder {
INSTANCE; INSTANCE;
Object lock = new Object(); Object lock = new Object();
@ -81,9 +90,32 @@ public class SpanReceiverHost implements TraceAdminProtocol {
private static List<ConfigurationPair> EMPTY = Collections.emptyList(); private static List<ConfigurationPair> EMPTY = Collections.emptyList();
private static String getUniqueLocalTraceFileName() {
String tmp = System.getProperty("java.io.tmpdir", "/tmp");
String nonce = null;
BufferedReader reader = null;
try {
// On Linux we can get a unique local file name by reading the process id
// out of /proc/self/stat. (There isn't any portable way to get the
// process ID from Java.)
reader = new BufferedReader(
new InputStreamReader(new FileInputStream("/proc/self/stat")));
String line = reader.readLine();
nonce = line.split(" ")[0];
} catch (IOException e) {
} finally {
IOUtils.cleanup(LOG, reader);
}
if (nonce == null) {
// If we can't use the process ID, use a random nonce.
nonce = UUID.randomUUID().toString();
}
return new File(tmp, nonce).getAbsolutePath();
}
/** /**
* Reads the names of classes specified in the * Reads the names of classes specified in the
* "hadoop.trace.spanreceiver.classes" property and instantiates and registers * "hadoop.htrace.spanreceiver.classes" property and instantiates and registers
* them with the Tracer as SpanReceiver's. * them with the Tracer as SpanReceiver's.
* *
* The nullary constructor is called during construction, but if the classes * The nullary constructor is called during construction, but if the classes
@ -98,8 +130,17 @@ public class SpanReceiverHost implements TraceAdminProtocol {
if (receiverNames == null || receiverNames.length == 0) { if (receiverNames == null || receiverNames.length == 0) {
return; return;
} }
// It's convenient to have each daemon log to a random trace file when
// testing.
if (config.get(LOCAL_FILE_SPAN_RECEIVER_PATH) == null) {
config.set(LOCAL_FILE_SPAN_RECEIVER_PATH,
getUniqueLocalTraceFileName());
}
for (String className : receiverNames) { for (String className : receiverNames) {
className = className.trim(); className = className.trim();
if (!className.contains(".")) {
className = "org.htrace.impl." + className;
}
try { try {
SpanReceiver rcvr = loadInstance(className, EMPTY); SpanReceiver rcvr = loadInstance(className, EMPTY);
Trace.addReceiver(rcvr); Trace.addReceiver(rcvr);
@ -145,7 +186,7 @@ public class SpanReceiverHost implements TraceAdminProtocol {
extraMap.put(pair.getKey(), pair.getValue()); extraMap.put(pair.getKey(), pair.getValue());
} }
return new HTraceConfiguration() { return new HTraceConfiguration() {
public static final String HTRACE_CONF_PREFIX = "hadoop."; public static final String HTRACE_CONF_PREFIX = "hadoop.htrace.";
@Override @Override
public String get(String key) { public String get(String key) {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.htrace.Sampler;
import org.htrace.impl.ProbabilitySampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class TraceSamplerFactory {
private static final Logger LOG =
LoggerFactory.getLogger(TraceSamplerFactory.class);
public static Sampler createSampler(Configuration conf) {
String samplerStr = conf.get(CommonConfigurationKeys.HADOOP_TRACE_SAMPLER,
CommonConfigurationKeys.HADOOP_TRACE_SAMPLER_DEFAULT);
if (samplerStr.equals("NeverSampler")) {
LOG.debug("HTrace is OFF for all spans.");
return Sampler.NEVER;
} else if (samplerStr.equals("AlwaysSampler")) {
LOG.info("HTrace is ON for all spans.");
return Sampler.ALWAYS;
} else if (samplerStr.equals("ProbabilitySampler")) {
double percentage =
conf.getDouble("htrace.probability.sampler.percentage", 0.01d);
LOG.info("HTrace is ON for " + percentage + "% of top-level spans.");
return new ProbabilitySampler(percentage / 100.0d);
} else {
throw new RuntimeException("Can't create sampler " + samplerStr +
". Available samplers are NeverSampler, AlwaysSampler, " +
"and ProbabilitySampler.");
}
}
}

View File

@ -365,6 +365,7 @@ Release 2.7.0 - UNRELEASED
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS

View File

@ -36,6 +36,9 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on * BlockReaderLocal enables local short circuited reads. If the DFS client is on
@ -304,6 +307,9 @@ class BlockReaderLocal implements BlockReader {
*/ */
private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
throws IOException { throws IOException {
TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
block.getBlockId() + ")", Sampler.NEVER);
try {
int total = 0; int total = 0;
long startDataPos = dataPos; long startDataPos = dataPos;
int startBufPos = buf.position(); int startBufPos = buf.position();
@ -346,6 +352,9 @@ class BlockReaderLocal implements BlockReader {
} }
} }
return total; return total;
} finally {
scope.close();
}
} }
private boolean createNoChecksumContext() { private boolean createNoChecksumContext() {

View File

@ -46,6 +46,9 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
@ -169,6 +172,7 @@ class BlockReaderLocalLegacy implements BlockReader {
/** offset in block where reader wants to actually read */ /** offset in block where reader wants to actually read */
private long startOffset; private long startOffset;
private final String filename; private final String filename;
private long blockId;
/** /**
* The only way this object can be instantiated. * The only way this object can be instantiated.
@ -320,6 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
this.checksum = checksum; this.checksum = checksum;
this.verifyChecksum = verifyChecksum; this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0); this.startOffset = Math.max(startOffset, 0);
this.blockId = block.getBlockId();
bytesPerChecksum = this.checksum.getBytesPerChecksum(); bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize(); checksumSize = this.checksum.getChecksumSize();
@ -357,6 +362,9 @@ class BlockReaderLocalLegacy implements BlockReader {
*/ */
private int fillBuffer(FileInputStream stream, ByteBuffer buf) private int fillBuffer(FileInputStream stream, ByteBuffer buf)
throws IOException { throws IOException {
TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
blockId + ")", Sampler.NEVER);
try {
int bytesRead = stream.getChannel().read(buf); int bytesRead = stream.getChannel().read(buf);
if (bytesRead < 0) { if (bytesRead < 0) {
//EOF //EOF
@ -371,6 +379,9 @@ class BlockReaderLocalLegacy implements BlockReader {
bytesRead += n; bytesRead += n;
} }
return bytesRead; return bytesRead;
} finally {
scope.close();
}
} }
/** /**

View File

@ -72,12 +72,14 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Proxy;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -200,6 +202,7 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -207,6 +210,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.tracing.SpanReceiverHost;
import org.apache.hadoop.tracing.TraceSamplerFactory;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.DataChecksum.Type;
@ -218,6 +223,11 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.htrace.impl.ProbabilitySampler;
/******************************************************** /********************************************************
* DFSClient can connect to a Hadoop Filesystem and * DFSClient can connect to a Hadoop Filesystem and
@ -266,6 +276,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
@VisibleForTesting @VisibleForTesting
KeyProvider provider; KeyProvider provider;
private final SpanReceiverHost spanReceiverHost;
private final Sampler traceSampler;
/** /**
* DFSClient configuration * DFSClient configuration
*/ */
@ -582,6 +595,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) Configuration conf, FileSystem.Statistics stats)
throws IOException { throws IOException {
spanReceiverHost = SpanReceiverHost.getInstance(conf);
traceSampler = TraceSamplerFactory.createSampler(conf);
// Copy only the required DFSClient configuration // Copy only the required DFSClient configuration
this.dfsClientConf = new Conf(conf); this.dfsClientConf = new Conf(conf);
if (this.dfsClientConf.useLegacyBlockReaderLocal) { if (this.dfsClientConf.useLegacyBlockReaderLocal) {
@ -3158,4 +3173,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public SaslDataTransferClient getSaslDataTransferClient() { public SaslDataTransferClient getSaslDataTransferClient() {
return saslClient; return saslClient;
} }
private static final byte[] PATH =
new String("path").getBytes(Charset.forName("UTF-8"));
TraceScope getPathTraceScope(String description, String path) {
TraceScope scope = Trace.startSpan(description, traceSampler);
Span span = scope.getSpan();
if (span != null) {
if (path != null) {
span.addKVAnnotation(PATH,
path.getBytes(Charset.forName("UTF-8")));
}
}
return scope;
}
} }

View File

@ -74,6 +74,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.IdentityHashStore;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
/**************************************************************** /****************************************************************
* DFSInputStream provides bytes from a named file. It handles * DFSInputStream provides bytes from a named file. It handles
@ -840,15 +843,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override @Override
public synchronized int read(final byte buf[], int off, int len) throws IOException { public synchronized int read(final byte buf[], int off, int len) throws IOException {
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
TraceScope scope =
dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
try {
return readWithStrategy(byteArrayReader, off, len); return readWithStrategy(byteArrayReader, off, len);
} finally {
scope.close();
}
} }
@Override @Override
public synchronized int read(final ByteBuffer buf) throws IOException { public synchronized int read(final ByteBuffer buf) throws IOException {
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
TraceScope scope =
dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
try {
return readWithStrategy(byteBufferReader, 0, buf.remaining()); return readWithStrategy(byteBufferReader, 0, buf.remaining());
} finally {
scope.close();
}
} }
@ -984,15 +997,23 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode, private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end, final LocatedBlock block, final long start, final long end,
final ByteBuffer bb, final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
final Span parentSpan = Trace.currentSpan();
return new Callable<ByteBuffer>() { return new Callable<ByteBuffer>() {
@Override @Override
public ByteBuffer call() throws Exception { public ByteBuffer call() throws Exception {
byte[] buf = bb.array(); byte[] buf = bb.array();
int offset = bb.position(); int offset = bb.position();
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start, end, buf, offset, actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap); corruptedBlockMap);
return bb; return bb;
} finally {
scope.close();
}
} }
}; };
} }
@ -1108,6 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>(); ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null; ByteBuffer bb = null;
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
int hedgedReadId = 0;
block = getBlockAt(block.getStartOffset(), false); block = getBlockAt(block.getStartOffset(), false);
while (true) { while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops // see HDFS-6591, this metric is used to verify/catch unnecessary loops
@ -1120,7 +1142,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len); bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap); chosenNode, block, start, end, bb, corruptedBlockMap,
hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(firstRequest); futures.add(firstRequest);
@ -1157,7 +1180,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
bb = ByteBuffer.allocate(len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, corruptedBlockMap); chosenNode, block, start, end, bb, corruptedBlockMap,
hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable); .submit(getFromDataNodeCallable);
futures.add(oneMoreRequest); futures.add(oneMoreRequest);
@ -1273,6 +1297,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override @Override
public int read(long position, byte[] buffer, int offset, int length) public int read(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src);
try {
return pread(position, buffer, offset, length);
} finally {
scope.close();
}
}
private int pread(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks // sanity checks
dfsClient.checkOpen(); dfsClient.checkOpen();
if (closed) { if (closed) {

View File

@ -46,6 +46,9 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
@ -69,6 +72,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
/** offset in block where reader wants to actually read */ /** offset in block where reader wants to actually read */
private long startOffset; private long startOffset;
private final long blockId;
/** offset in block of of first chunk - may be less than startOffset /** offset in block of of first chunk - may be less than startOffset
if startOffset is not chunk-aligned */ if startOffset is not chunk-aligned */
private final long firstChunkOffset; private final long firstChunkOffset;
@ -208,6 +213,19 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
protected synchronized int readChunk(long pos, byte[] buf, int offset, protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf) int len, byte[] checksumBuf)
throws IOException { throws IOException {
TraceScope scope =
Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
Sampler.NEVER);
try {
return readChunkImpl(pos, buf, offset, len, checksumBuf);
} finally {
scope.close();
}
}
private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk. // Read one chunk.
if (eos) { if (eos) {
// Already hit EOF // Already hit EOF
@ -347,6 +365,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
this.in = in; this.in = in;
this.checksum = checksum; this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 ); this.startOffset = Math.max( startOffset, 0 );
this.blockId = blockId;
// The total number of bytes that we need to transfer from the DN is // The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at // the amount that the user wants (bytesToRead), plus the padding at
@ -367,7 +386,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
* Create a new BlockReader specifically to satisfy a read. * Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request. * This method also sends the OP_READ_BLOCK request.
* *
* @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location * @param file File location
* @param block The block object * @param block The block object
* @param blockToken The block token for security * @param blockToken The block token for security

View File

@ -53,6 +53,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* This is a wrapper around connection to datanode * This is a wrapper around connection to datanode
@ -88,6 +91,7 @@ public class RemoteBlockReader2 implements BlockReader {
final private Peer peer; final private Peer peer;
final private DatanodeID datanodeID; final private DatanodeID datanodeID;
final private PeerCache peerCache; final private PeerCache peerCache;
final private long blockId;
private final ReadableByteChannel in; private final ReadableByteChannel in;
private DataChecksum checksum; private DataChecksum checksum;
@ -143,7 +147,13 @@ public class RemoteBlockReader2 implements BlockReader {
} }
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = Trace.startSpan(
"RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
try {
readNextPacket(); readNextPacket();
} finally {
scope.close();
}
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -165,7 +175,13 @@ public class RemoteBlockReader2 implements BlockReader {
@Override @Override
public int read(ByteBuffer buf) throws IOException { public int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
TraceScope scope = Trace.startSpan(
"RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
try {
readNextPacket(); readNextPacket();
} finally {
scope.close();
}
} }
if (curDataSlice.remaining() == 0) { if (curDataSlice.remaining() == 0) {
// we're at EOF now // we're at EOF now
@ -289,6 +305,7 @@ public class RemoteBlockReader2 implements BlockReader {
this.startOffset = Math.max( startOffset, 0 ); this.startOffset = Math.max( startOffset, 0 );
this.filename = file; this.filename = file;
this.peerCache = peerCache; this.peerCache = peerCache;
this.blockId = blockId;
// The total number of bytes that we need to transfer from the DN is // The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at // the amount that the user wants (bytesToRead), plus the padding at
@ -372,8 +389,6 @@ public class RemoteBlockReader2 implements BlockReader {
* Create a new BlockReader specifically to satisfy a read. * Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request. * This method also sends the OP_READ_BLOCK request.
* *
* @param sock An established Socket to the DN. The BlockReader will not close it normally.
* This socket must have an associated Channel.
* @param file File location * @param file File location
* @param block The block object * @param block The block object
* @param blockToken The block token for security * @param blockToken The block token for security

View File

@ -47,6 +47,9 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/** /**
* Reads a block from the disk and sends it to a recipient. * Reads a block from the disk and sends it to a recipient.
@ -668,6 +671,17 @@ class BlockSender implements java.io.Closeable {
*/ */
long sendBlock(DataOutputStream out, OutputStream baseStream, long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException { DataTransferThrottler throttler) throws IOException {
TraceScope scope =
Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER);
try {
return doSendBlock(out, baseStream, throttler);
} finally {
scope.close();
}
}
private long doSendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
if (out == null) { if (out == null) {
throw new IOException( "out stream is null" ); throw new IOException( "out stream is null" );
} }