From 9be338911bcffb7ac07306e237a03036c04be37a Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Tue, 16 Sep 2014 13:58:40 -0700 Subject: [PATCH] HDFS-6880. Adding tracing to DataNode data transfer protocol (iwasakims via cmccabe) (cherry picked from commit 56119fec96abbcc44c5dd82fdb694d2c3b53feb3) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 40 +++++- .../datatransfer/DataTransferProtoUtil.java | 44 ++++++- .../hdfs/protocol/datatransfer/Receiver.java | 123 +++++++++++++----- .../hdfs/protocol/datatransfer/Sender.java | 26 +++- .../src/main/proto/datatransfer.proto | 8 ++ .../apache/hadoop/tracing/TestTracing.java | 52 ++++++-- .../TestTracingShortCircuitLocalRead.java | 97 ++++++++++++++ 8 files changed, 331 insertions(+), 62 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d945a25982a..6c19fce8460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -203,6 +203,9 @@ Release 2.6.0 - UNRELEASED HDFS-7059. HAadmin transtionToActive with forceActive option can show confusing message. + HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims + via cmccabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 5526cb1af63..db3aecfc7aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -88,6 +88,10 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; + import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -355,12 +359,22 @@ public class DFSOutputStream extends FSOutputSummer /** Append on an existing block? */ private final boolean isAppend; + private final Span traceSpan; + /** * Default construction for file create */ private DataStreamer() { + this(null); + } + + /** + * construction with tracing info + */ + private DataStreamer(Span span) { isAppend = false; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + traceSpan = span; } /** @@ -371,9 +385,10 @@ public class DFSOutputStream extends FSOutputSummer * @throws IOException if error occurs */ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, - int bytesPerChecksum) throws IOException { + int bytesPerChecksum, Span span) throws IOException { isAppend = true; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; + traceSpan = span; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); @@ -463,6 +478,10 @@ public class DFSOutputStream extends FSOutputSummer @Override public void run() { long lastPacket = Time.now(); + TraceScope traceScope = null; + if (traceSpan != null) { + traceScope = Trace.continueSpan(traceSpan); + } while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder @@ -636,6 +655,9 @@ public class DFSOutputStream extends FSOutputSummer } } } + if (traceScope != null) { + traceScope.close(); + } closeInternal(); } @@ -1611,7 +1633,11 @@ public class DFSOutputStream extends FSOutputSummer computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(); + Span traceSpan = null; + if (Trace.isTracing()) { + traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); + } + streamer = new DataStreamer(traceSpan); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1652,15 +1678,21 @@ public class DFSOutputStream extends FSOutputSummer this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened + Span traceSpan = null; + if (Trace.isTracing()) { + traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); + } + // The last partial block of the file has to be filled. if (lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum()); + streamer = new DataStreamer(lastBlock, stat, + checksum.getBytesPerChecksum(), traceSpan); } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(); + streamer = new DataStreamer(traceSpan); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index 6be3810c918..b91e17a3b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -25,12 +25,16 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; - +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceInfo; +import org.htrace.TraceScope; /** * Static utilities for dealing with the protocol buffers used by the @@ -78,9 +82,41 @@ public abstract class DataTransferProtoUtil { static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token blockToken) { - return BaseHeaderProto.newBuilder() + BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() .setBlock(PBHelper.convert(blk)) - .setToken(PBHelper.convert(blockToken)) - .build(); + .setToken(PBHelper.convert(blockToken)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()) + .setParentId(s.getSpanId())); + } + return builder.build(); + } + + public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { + if (proto == null) return null; + if (!proto.hasTraceId()) return null; + return new TraceInfo(proto.getTraceId(), proto.getParentId()); + } + + public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, + String description) { + return continueTraceSpan(header.getBaseHeader(), description); + } + + public static TraceScope continueTraceSpan(BaseHeaderProto header, + String description) { + return continueTraceSpan(header.getTraceInfo(), description); + } + + public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, + String description) { + TraceScope scope = null; + TraceInfo info = fromProto(proto); + if (info != null) { + scope = Trace.startSpan(description, info); + } + return scope; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index a09437c0b0a..daae9b71129 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; +import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.DataInputStream; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.htrace.TraceScope; /** Receiver */ @InterfaceAudience.Private @@ -108,7 +110,10 @@ public abstract class Receiver implements DataTransferProtocol { /** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); - readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), @@ -117,27 +122,36 @@ public abstract class Receiver implements DataTransferProtocol { (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); - writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), - PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), - proto.getHeader().getClientName(), - targets, - PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), - PBHelper.convert(proto.getSource()), - fromProto(proto.getStage()), - proto.getPipelineSize(), - proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), - proto.getLatestGenerationStamp(), - fromProto(proto.getRequestedChecksum()), - (proto.hasCachingStrategy() ? - getCachingStrategy(proto.getCachingStrategy()) : - CachingStrategy.newDefaultStrategy())); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), + proto.getHeader().getClientName(), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), + PBHelper.convert(proto.getSource()), + fromProto(proto.getStage()), + proto.getPipelineSize(), + proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), + proto.getLatestGenerationStamp(), + fromProto(proto.getRequestedChecksum()), + (proto.hasCachingStrategy() ? + getCachingStrategy(proto.getCachingStrategy()) : + CachingStrategy.newDefaultStrategy())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#TRANSFER_BLOCK} */ @@ -145,11 +159,17 @@ public abstract class Receiver implements DataTransferProtocol { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); - transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), - proto.getHeader().getClientName(), - targets, - PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), + proto.getHeader().getClientName(), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -158,9 +178,15 @@ public abstract class Receiver implements DataTransferProtocol { OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in)); SlotId slotId = (proto.hasSlotId()) ? PBHelper.convert(proto.getSlotId()) : null; - requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convert(proto.getHeader().getToken()), - slotId, proto.getMaxVersion()); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken()), + slotId, proto.getMaxVersion()); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */ @@ -168,38 +194,67 @@ public abstract class Receiver implements DataTransferProtocol { throws IOException { final ReleaseShortCircuitAccessRequestProto proto = ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in)); - releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); + TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(), + proto.getClass().getSimpleName()); + try { + releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */ private void opRequestShortCircuitShm(DataInputStream in) throws IOException { final ShortCircuitShmRequestProto proto = ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in)); - requestShortCircuitShm(proto.getClientName()); + TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(), + proto.getClass().getSimpleName()); + try { + requestShortCircuitShm(proto.getClientName()); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_REPLACE_BLOCK */ private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); - replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), - PBHelper.convert(proto.getHeader().getToken()), - proto.getDelHint(), - PBHelper.convert(proto.getSource())); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), + PBHelper.convert(proto.getHeader().getToken()), + proto.getDelHint(), + PBHelper.convert(proto.getSource())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_COPY_BLOCK */ private void opCopyBlock(DataInputStream in) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in)); - copyBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convert(proto.getHeader().getToken())); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + copyBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_BLOCK_CHECKSUM */ private void opBlockChecksum(DataInputStream in) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in)); - + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) traceScope.close(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 68da52399c8..fb6cf2cc3fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; @@ -47,6 +48,9 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.htrace.Trace; +import org.htrace.Span; + import com.google.protobuf.Message; /** Sender */ @@ -185,19 +189,29 @@ public class Sender implements DataTransferProtocol { @Override public void releaseShortCircuitFds(SlotId slotId) throws IOException { - ReleaseShortCircuitAccessRequestProto proto = + ReleaseShortCircuitAccessRequestProto.Builder builder = ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelper.convert(slotId)). - build(); + setSlotId(PBHelper.convert(slotId)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ReleaseShortCircuitAccessRequestProto proto = builder.build(); send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); } @Override public void requestShortCircuitShm(String clientName) throws IOException { - ShortCircuitShmRequestProto proto = + ShortCircuitShmRequestProto.Builder builder = ShortCircuitShmRequestProto.newBuilder(). - setClientName(clientName). - build(); + setClientName(clientName); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ShortCircuitShmRequestProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 6283b569dda..098d10ab9a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto { message BaseHeaderProto { required ExtendedBlockProto block = 1; optional hadoop.common.TokenProto token = 2; + optional DataTransferTraceInfoProto traceInfo = 3; +} + +message DataTransferTraceInfoProto { + required uint64 traceId = 1; + required uint64 parentId = 2; } message ClientOperationHeaderProto { @@ -166,6 +172,7 @@ message OpRequestShortCircuitAccessProto { message ReleaseShortCircuitAccessRequestProto { required ShortCircuitShmSlotProto slotId = 1; + optional DataTransferTraceInfoProto traceInfo = 2; } message ReleaseShortCircuitAccessResponseProto { @@ -177,6 +184,7 @@ message ShortCircuitShmRequestProto { // The name of the client requesting the shared memory segment. This is // purely for logging / debugging purposes. required string clientName = 1; + optional DataTransferTraceInfoProto traceInfo = 2; } message ShortCircuitShmResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java index bb923a2c6be..b3e6ee8e3d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.htrace.HTraceConfiguration; import org.htrace.Sampler; import org.htrace.Span; @@ -39,11 +40,13 @@ import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Supplier; public class TestTracing { @@ -81,7 +84,12 @@ public class TestTracing { "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create", "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync", - "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete" + "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete", + "DFSOutputStream", + "OpWriteBlockProto", + "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock" }; assertSpanNamesFound(expectedSpanNames); @@ -96,7 +104,7 @@ public class TestTracing { // There should only be one trace id as it should all be homed in the // top trace. - for (Span span : SetSpanReceiver.SetHolder.spans) { + for (Span span : SetSpanReceiver.SetHolder.spans.values()) { Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); } } @@ -152,7 +160,8 @@ public class TestTracing { String[] expectedSpanNames = { "testReadTraceHooks", "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations", - "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations" + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations", + "OpReadBlockProto" }; assertSpanNamesFound(expectedSpanNames); @@ -168,7 +177,7 @@ public class TestTracing { // There should only be one trace id as it should all be homed in the // top trace. - for (Span span : SetSpanReceiver.SetHolder.spans) { + for (Span span : SetSpanReceiver.SetHolder.spans.values()) { Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); } } @@ -228,10 +237,24 @@ public class TestTracing { cluster.shutdown(); } - private void assertSpanNamesFound(String[] expectedSpanNames) { - Map> map = SetSpanReceiver.SetHolder.getMap(); - for (String spanName : expectedSpanNames) { - Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null); + static void assertSpanNamesFound(final String[] expectedSpanNames) { + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + Map> map = SetSpanReceiver.SetHolder.getMap(); + for (String spanName : expectedSpanNames) { + if (!map.containsKey(spanName)) { + return false; + } + } + return true; + } + }, 100, 1000); + } catch (TimeoutException e) { + Assert.fail("timed out to get expected spans: " + e.getMessage()); + } catch (InterruptedException e) { + Assert.fail("interrupted while waiting spans: " + e.getMessage()); } } @@ -249,15 +272,16 @@ public class TestTracing { } public void receiveSpan(Span span) { - SetHolder.spans.add(span); + SetHolder.spans.put(span.getSpanId(), span); } public void close() { } public static class SetHolder { - public static Set spans = new HashSet(); - + public static ConcurrentHashMap spans = + new ConcurrentHashMap(); + public static int size() { return spans.size(); } @@ -265,7 +289,7 @@ public class TestTracing { public static Map> getMap() { Map> map = new HashMap>(); - for (Span s : spans) { + for (Span s : spans.values()) { List l = map.get(s.getDescription()); if (l == null) { l = new LinkedList(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java new file mode 100644 index 00000000000..7fe8a1eab1e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.IOException; + +public class TestTracingShortCircuitLocalRead { + private static Configuration conf; + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + private static SpanReceiverHost spanReceiverHost; + private static TemporarySocketDirectory sockDir; + static final Path TEST_PATH = new Path("testShortCircuitTraceHooks"); + static final int TEST_LENGTH = 1234; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + + @Test + public void testShortCircuitTraceHooks() throws IOException { + conf = new Configuration(); + conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, + TestTracing.SetSpanReceiver.class.getName()); + conf.setLong("dfs.blocksize", 100 * 1024); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + "testShortCircuitTraceHooks._PORT"); + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + dfs = cluster.getFileSystem(); + + try { + spanReceiverHost = SpanReceiverHost.getInstance(conf); + DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L); + + TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS); + FSDataInputStream stream = dfs.open(TEST_PATH); + byte buf[] = new byte[TEST_LENGTH]; + IOUtils.readFully(stream, buf, 0, TEST_LENGTH); + stream.close(); + ts.close(); + + String[] expectedSpanNames = { + "OpRequestShortCircuitAccessProto", + "ShortCircuitShmRequestProto" + }; + TestTracing.assertSpanNamesFound(expectedSpanNames); + } finally { + dfs.close(); + cluster.shutdown(); + } + } +}