HDFS-6880. Adding tracing to DataNode data transfer protocol (iwasakims via cmccabe)

(cherry picked from commit 56119fec96)
This commit is contained in:
Colin Patrick Mccabe 2014-09-16 13:58:40 -07:00
parent 03fdbd7899
commit 9be338911b
8 changed files with 331 additions and 62 deletions

View File

@ -203,6 +203,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7059. HAadmin transtionToActive with forceActive option can show HDFS-7059. HAadmin transtionToActive with forceActive option can show
confusing message. confusing message.
HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims
via cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -88,6 +88,10 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
@ -355,12 +359,22 @@ public class DFSOutputStream extends FSOutputSummer
/** Append on an existing block? */ /** Append on an existing block? */
private final boolean isAppend; private final boolean isAppend;
private final Span traceSpan;
/** /**
* Default construction for file create * Default construction for file create
*/ */
private DataStreamer() { private DataStreamer() {
this(null);
}
/**
* construction with tracing info
*/
private DataStreamer(Span span) {
isAppend = false; isAppend = false;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
} }
/** /**
@ -371,9 +385,10 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException if error occurs * @throws IOException if error occurs
*/ */
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException { int bytesPerChecksum, Span span) throws IOException {
isAppend = true; isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
traceSpan = span;
block = lastBlock.getBlock(); block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken(); accessToken = lastBlock.getBlockToken();
@ -463,6 +478,10 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
public void run() { public void run() {
long lastPacket = Time.now(); long lastPacket = Time.now();
TraceScope traceScope = null;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
while (!streamerClosed && dfsClient.clientRunning) { while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder // if the Responder encountered an error, shutdown Responder
@ -636,6 +655,9 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
} }
if (traceScope != null) {
traceScope.close();
}
closeInternal(); closeInternal();
} }
@ -1611,7 +1633,11 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum()); checksum.getBytesPerChecksum());
streamer = new DataStreamer(); Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) { if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes); streamer.setFavoredNodes(favoredNodes);
} }
@ -1652,15 +1678,21 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened initialFileSize = stat.getLen(); // length of file when opened
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
// The last partial block of the file has to be filled. // The last partial block of the file has to be filled.
if (lastBlock != null) { if (lastBlock != null) {
// indicate that we are appending to an existing block // indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize(); bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum()); streamer = new DataStreamer(lastBlock, stat,
checksum.getBytesPerChecksum(), traceSpan);
} else { } else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum()); checksum.getBytesPerChecksum());
streamer = new DataStreamer(); streamer = new DataStreamer(traceSpan);
} }
this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.fileEncryptionInfo = stat.getFileEncryptionInfo();
} }

View File

@ -25,12 +25,16 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceInfo;
import org.htrace.TraceScope;
/** /**
* Static utilities for dealing with the protocol buffers used by the * Static utilities for dealing with the protocol buffers used by the
@ -78,9 +82,41 @@ public abstract class DataTransferProtoUtil {
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) { Token<BlockTokenIdentifier> blockToken) {
return BaseHeaderProto.newBuilder() BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder()
.setBlock(PBHelper.convert(blk)) .setBlock(PBHelper.convert(blk))
.setToken(PBHelper.convert(blockToken)) .setToken(PBHelper.convert(blockToken));
.build(); if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId())
.setParentId(s.getSpanId()));
}
return builder.build();
}
public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
if (proto == null) return null;
if (!proto.hasTraceId()) return null;
return new TraceInfo(proto.getTraceId(), proto.getParentId());
}
public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
String description) {
return continueTraceSpan(header.getBaseHeader(), description);
}
public static TraceScope continueTraceSpan(BaseHeaderProto header,
String description) {
return continueTraceSpan(header.getTraceInfo(), description);
}
public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
String description) {
TraceScope scope = null;
TraceInfo info = fromProto(proto);
if (info != null) {
scope = Trace.startSpan(description, info);
}
return scope;
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol.datatransfer; package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.htrace.TraceScope;
/** Receiver */ /** Receiver */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -108,7 +110,10 @@ public abstract class Receiver implements DataTransferProtocol {
/** Receive OP_READ_BLOCK */ /** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException { private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
proto.getOffset(), proto.getOffset(),
@ -117,27 +122,36 @@ public abstract class Receiver implements DataTransferProtocol {
(proto.hasCachingStrategy() ? (proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) : getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy())); CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive OP_WRITE_BLOCK */ /** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException { private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
PBHelper.convertStorageType(proto.getStorageType()), proto.getClass().getSimpleName());
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), try {
proto.getHeader().getClientName(), writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
targets, PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
PBHelper.convert(proto.getSource()), proto.getHeader().getClientName(),
fromProto(proto.getStage()), targets,
proto.getPipelineSize(), PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), PBHelper.convert(proto.getSource()),
proto.getLatestGenerationStamp(), fromProto(proto.getStage()),
fromProto(proto.getRequestedChecksum()), proto.getPipelineSize(),
(proto.hasCachingStrategy() ? proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
getCachingStrategy(proto.getCachingStrategy()) : proto.getLatestGenerationStamp(),
CachingStrategy.newDefaultStrategy())); fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive {@link Op#TRANSFER_BLOCK} */ /** Receive {@link Op#TRANSFER_BLOCK} */
@ -145,11 +159,17 @@ public abstract class Receiver implements DataTransferProtocol {
final OpTransferBlockProto proto = final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in)); OpTransferBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getClass().getSimpleName());
proto.getHeader().getClientName(), try {
targets, transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@ -158,9 +178,15 @@ public abstract class Receiver implements DataTransferProtocol {
OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in)); OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
SlotId slotId = (proto.hasSlotId()) ? SlotId slotId = (proto.hasSlotId()) ?
PBHelper.convert(proto.getSlotId()) : null; PBHelper.convert(proto.getSlotId()) : null;
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
PBHelper.convert(proto.getHeader().getToken()), proto.getClass().getSimpleName());
slotId, proto.getMaxVersion()); try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion());
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */ /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
@ -168,38 +194,67 @@ public abstract class Receiver implements DataTransferProtocol {
throws IOException { throws IOException {
final ReleaseShortCircuitAccessRequestProto proto = final ReleaseShortCircuitAccessRequestProto proto =
ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in)); ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
proto.getClass().getSimpleName());
try {
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */ /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
private void opRequestShortCircuitShm(DataInputStream in) throws IOException { private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
final ShortCircuitShmRequestProto proto = final ShortCircuitShmRequestProto proto =
ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in)); ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
requestShortCircuitShm(proto.getClientName()); TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
proto.getClass().getSimpleName());
try {
requestShortCircuitShm(proto.getClientName());
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive OP_REPLACE_BLOCK */ /** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException { private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
PBHelper.convertStorageType(proto.getStorageType()), proto.getClass().getSimpleName());
PBHelper.convert(proto.getHeader().getToken()), try {
proto.getDelHint(), replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getSource())); PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive OP_COPY_BLOCK */ /** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException { private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in)); OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
copyBlock(PBHelper.convert(proto.getHeader().getBlock()), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
PBHelper.convert(proto.getHeader().getToken())); proto.getClass().getSimpleName());
try {
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
/** Receive OP_BLOCK_CHECKSUM */ /** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException { private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in)); OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken())); PBHelper.convert(proto.getHeader().getToken()));
} finally {
if (traceScope != null) traceScope.close();
}
} }
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
@ -47,6 +48,9 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.htrace.Trace;
import org.htrace.Span;
import com.google.protobuf.Message; import com.google.protobuf.Message;
/** Sender */ /** Sender */
@ -185,19 +189,29 @@ public class Sender implements DataTransferProtocol {
@Override @Override
public void releaseShortCircuitFds(SlotId slotId) throws IOException { public void releaseShortCircuitFds(SlotId slotId) throws IOException {
ReleaseShortCircuitAccessRequestProto proto = ReleaseShortCircuitAccessRequestProto.Builder builder =
ReleaseShortCircuitAccessRequestProto.newBuilder(). ReleaseShortCircuitAccessRequestProto.newBuilder().
setSlotId(PBHelper.convert(slotId)). setSlotId(PBHelper.convert(slotId));
build(); if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
}
ReleaseShortCircuitAccessRequestProto proto = builder.build();
send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
} }
@Override @Override
public void requestShortCircuitShm(String clientName) throws IOException { public void requestShortCircuitShm(String clientName) throws IOException {
ShortCircuitShmRequestProto proto = ShortCircuitShmRequestProto.Builder builder =
ShortCircuitShmRequestProto.newBuilder(). ShortCircuitShmRequestProto.newBuilder().
setClientName(clientName). setClientName(clientName);
build(); if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
}
ShortCircuitShmRequestProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
} }

View File

@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto {
message BaseHeaderProto { message BaseHeaderProto {
required ExtendedBlockProto block = 1; required ExtendedBlockProto block = 1;
optional hadoop.common.TokenProto token = 2; optional hadoop.common.TokenProto token = 2;
optional DataTransferTraceInfoProto traceInfo = 3;
}
message DataTransferTraceInfoProto {
required uint64 traceId = 1;
required uint64 parentId = 2;
} }
message ClientOperationHeaderProto { message ClientOperationHeaderProto {
@ -166,6 +172,7 @@ message OpRequestShortCircuitAccessProto {
message ReleaseShortCircuitAccessRequestProto { message ReleaseShortCircuitAccessRequestProto {
required ShortCircuitShmSlotProto slotId = 1; required ShortCircuitShmSlotProto slotId = 1;
optional DataTransferTraceInfoProto traceInfo = 2;
} }
message ReleaseShortCircuitAccessResponseProto { message ReleaseShortCircuitAccessResponseProto {
@ -177,6 +184,7 @@ message ShortCircuitShmRequestProto {
// The name of the client requesting the shared memory segment. This is // The name of the client requesting the shared memory segment. This is
// purely for logging / debugging purposes. // purely for logging / debugging purposes.
required string clientName = 1; required string clientName = 1;
optional DataTransferTraceInfoProto traceInfo = 2;
} }
message ShortCircuitShmResponseProto { message ShortCircuitShmResponseProto {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.htrace.HTraceConfiguration; import org.htrace.HTraceConfiguration;
import org.htrace.Sampler; import org.htrace.Sampler;
import org.htrace.Span; import org.htrace.Span;
@ -39,11 +40,13 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
public class TestTracing { public class TestTracing {
@ -81,7 +84,12 @@ public class TestTracing {
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create", "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync", "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete" "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete",
"DFSOutputStream",
"OpWriteBlockProto",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock"
}; };
assertSpanNamesFound(expectedSpanNames); assertSpanNamesFound(expectedSpanNames);
@ -96,7 +104,7 @@ public class TestTracing {
// There should only be one trace id as it should all be homed in the // There should only be one trace id as it should all be homed in the
// top trace. // top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) { for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
} }
} }
@ -152,7 +160,8 @@ public class TestTracing {
String[] expectedSpanNames = { String[] expectedSpanNames = {
"testReadTraceHooks", "testReadTraceHooks",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations", "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations" "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations",
"OpReadBlockProto"
}; };
assertSpanNamesFound(expectedSpanNames); assertSpanNamesFound(expectedSpanNames);
@ -168,7 +177,7 @@ public class TestTracing {
// There should only be one trace id as it should all be homed in the // There should only be one trace id as it should all be homed in the
// top trace. // top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) { for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
} }
} }
@ -228,10 +237,24 @@ public class TestTracing {
cluster.shutdown(); cluster.shutdown();
} }
private void assertSpanNamesFound(String[] expectedSpanNames) { static void assertSpanNamesFound(final String[] expectedSpanNames) {
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap(); try {
for (String spanName : expectedSpanNames) { GenericTestUtils.waitFor(new Supplier<Boolean>() {
Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null); @Override
public Boolean get() {
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
for (String spanName : expectedSpanNames) {
if (!map.containsKey(spanName)) {
return false;
}
}
return true;
}
}, 100, 1000);
} catch (TimeoutException e) {
Assert.fail("timed out to get expected spans: " + e.getMessage());
} catch (InterruptedException e) {
Assert.fail("interrupted while waiting spans: " + e.getMessage());
} }
} }
@ -249,15 +272,16 @@ public class TestTracing {
} }
public void receiveSpan(Span span) { public void receiveSpan(Span span) {
SetHolder.spans.add(span); SetHolder.spans.put(span.getSpanId(), span);
} }
public void close() { public void close() {
} }
public static class SetHolder { public static class SetHolder {
public static Set<Span> spans = new HashSet<Span>(); public static ConcurrentHashMap<Long, Span> spans =
new ConcurrentHashMap<Long, Span>();
public static int size() { public static int size() {
return spans.size(); return spans.size();
} }
@ -265,7 +289,7 @@ public class TestTracing {
public static Map<String, List<Span>> getMap() { public static Map<String, List<Span>> getMap() {
Map<String, List<Span>> map = new HashMap<String, List<Span>>(); Map<String, List<Span>> map = new HashMap<String, List<Span>>();
for (Span s : spans) { for (Span s : spans.values()) {
List<Span> l = map.get(s.getDescription()); List<Span> l = map.get(s.getDescription());
if (l == null) { if (l == null) {
l = new LinkedList<Span>(); l = new LinkedList<Span>();

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
public class TestTracingShortCircuitLocalRead {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static SpanReceiverHost spanReceiverHost;
private static TemporarySocketDirectory sockDir;
static final Path TEST_PATH = new Path("testShortCircuitTraceHooks");
static final int TEST_LENGTH = 1234;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
@Test
public void testShortCircuitTraceHooks() throws IOException {
conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
TestTracing.SetSpanReceiver.class.getName());
conf.setLong("dfs.blocksize", 100 * 1024);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"testShortCircuitTraceHooks._PORT");
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
dfs = cluster.getFileSystem();
try {
spanReceiverHost = SpanReceiverHost.getInstance(conf);
DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);
TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);
FSDataInputStream stream = dfs.open(TEST_PATH);
byte buf[] = new byte[TEST_LENGTH];
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
stream.close();
ts.close();
String[] expectedSpanNames = {
"OpRequestShortCircuitAccessProto",
"ShortCircuitShmRequestProto"
};
TestTracing.assertSpanNamesFound(expectedSpanNames);
} finally {
dfs.close();
cluster.shutdown();
}
}
}