diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 09f1c5a2d32..ae495be0e65 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -224,6 +224,10 @@ compile + + org.htrace + htrace-core + org.apache.zookeeper zookeeper diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 158445f8367..2f482c290ed 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -88,6 +88,7 @@ import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import org.htrace.Trace; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -694,6 +695,9 @@ public class Client { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } + if (Trace.isTracing()) { + Trace.addTimelineAnnotation("IPC client connecting to " + server); + } short numRetries = 0; Random rand = null; while (true) { @@ -758,6 +762,10 @@ public class Client { // update last activity time touch(); + if (Trace.isTracing()) { + Trace.addTimelineAnnotation("IPC client connected to " + server); + } + // start the receiver thread after the socket connection has been set // up start(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 64615d22f85..0ccdb71d0ee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -48,6 +48,9 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.Time; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; @@ -191,6 +194,16 @@ public class ProtobufRpcEngine implements RpcEngine { + method.getName() + "]"); } + TraceScope traceScope = null; + // if Tracing is on then start a new span for this rpc. + // guard it in the if statement to make sure there isn't + // any extra string manipulation. + if (Trace.isTracing()) { + traceScope = Trace.startSpan( + method.getDeclaringClass().getCanonicalName() + + "." + method.getName()); + } + RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); if (LOG.isTraceEnabled()) { @@ -212,8 +225,13 @@ public class ProtobufRpcEngine implements RpcEngine { remoteId + ": " + method.getName() + " {" + e + "}"); } - + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation( + "Call got exception: " + e.getMessage()); + } throw new ServiceException(e); + } finally { + if (traceScope != null) traceScope.close(); } if (LOG.isDebugEnabled()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 24dd0c21b82..021e03537b4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -79,6 +79,7 @@ import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; @@ -115,6 +116,10 @@ import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceInfo; +import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -506,6 +511,7 @@ public abstract class Server { private ByteBuffer rpcResponse; // the response for this call private final RPC.RpcKind rpcKind; private final byte[] clientId; + private final Span traceSpan; // the tracing span on the server side public Call(int id, int retryCount, Writable param, Connection connection) { @@ -515,6 +521,11 @@ public abstract class Server { public Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId) { + this(id, retryCount, param, connection, kind, clientId, null); + } + + public Call(int id, int retryCount, Writable param, Connection connection, + RPC.RpcKind kind, byte[] clientId, Span span) { this.callId = id; this.retryCount = retryCount; this.rpcRequest = param; @@ -523,6 +534,7 @@ public abstract class Server { this.rpcResponse = null; this.rpcKind = kind; this.clientId = clientId; + this.traceSpan = span; } @Override @@ -1921,9 +1933,18 @@ public abstract class Server { RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } + Span traceSpan = null; + if (header.hasTraceInfo()) { + // If the incoming RPC included tracing info, always continue the trace + TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(), + header.getTraceInfo().getParentId()); + traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach(); + } + Call call = new Call(header.getCallId(), header.getRetryCount(), - rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header - .getClientId().toByteArray()); + rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), + header.getClientId().toByteArray(), traceSpan); + callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -2067,6 +2088,7 @@ public abstract class Server { ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { + TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) { @@ -2083,6 +2105,10 @@ public abstract class Server { Writable value = null; CurCall.set(call); + if (call.traceSpan != null) { + traceScope = Trace.continueSpan(call.traceSpan); + } + try { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject @@ -2156,9 +2182,22 @@ public abstract class Server { } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " + + StringUtils.stringifyException(e)); + } } } catch (Exception e) { LOG.info(Thread.currentThread().getName() + " caught an exception", e); + if (Trace.isTracing()) { + traceScope.getSpan().addTimelineAnnotation("Exception: " + + StringUtils.stringifyException(e)); + } + } finally { + if (traceScope != null) { + traceScope.close(); + } + IOUtils.cleanup(LOG, traceScope); } } LOG.debug(Thread.currentThread().getName() + ": exiting"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 04ab4dc2699..4b2dfe0de10 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -41,6 +41,8 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.*; +import org.htrace.Trace; +import org.htrace.TraceScope; /** An RpcEngine implementation for Writable data. */ @InterfaceStability.Evolving @@ -227,9 +229,19 @@ public class WritableRpcEngine implements RpcEngine { if (LOG.isDebugEnabled()) { startTime = Time.now(); } - - ObjectWritable value = (ObjectWritable) - client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); + TraceScope traceScope = null; + if (Trace.isTracing()) { + traceScope = Trace.startSpan( + method.getDeclaringClass().getCanonicalName() + + "." + method.getName()); + } + ObjectWritable value; + try { + value = (ObjectWritable) + client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); + } finally { + if (traceScope != null) traceScope.close(); + } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java new file mode 100644 index 00000000000..b8c7b311ffa --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.htrace.HTraceConfiguration; +import org.htrace.SpanReceiver; +import org.htrace.Trace; + +/** + * This class provides functions for reading the names of SpanReceivers from + * the Hadoop configuration, adding those SpanReceivers to the Tracer, + * and closing those SpanReceivers when appropriate. + * This class does nothing If no SpanReceiver is configured. + */ +@InterfaceAudience.Private +public class SpanReceiverHost { + public static final String SPAN_RECEIVERS_CONF_KEY = "hadoop.trace.spanreceiver.classes"; + private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class); + private Collection receivers = new HashSet(); + private boolean closed = false; + + private static enum SingletonHolder { + INSTANCE; + Object lock = new Object(); + SpanReceiverHost host = null; + } + + public static SpanReceiverHost getInstance(Configuration conf) { + if (SingletonHolder.INSTANCE.host != null) { + return SingletonHolder.INSTANCE.host; + } + synchronized (SingletonHolder.INSTANCE.lock) { + if (SingletonHolder.INSTANCE.host != null) { + return SingletonHolder.INSTANCE.host; + } + SpanReceiverHost host = new SpanReceiverHost(); + host.loadSpanReceivers(conf); + SingletonHolder.INSTANCE.host = host; + ShutdownHookManager.get().addShutdownHook(new Runnable() { + public void run() { + SingletonHolder.INSTANCE.host.closeReceivers(); + } + }, 0); + return SingletonHolder.INSTANCE.host; + } + } + + /** + * Reads the names of classes specified in the + * "hadoop.trace.spanreceiver.classes" property and instantiates and registers + * them with the Tracer as SpanReceiver's. + * + * The nullary constructor is called during construction, but if the classes + * specified implement the Configurable interface, setConfiguration() will be + * called on them. This allows SpanReceivers to use values from the Hadoop + * configuration. + */ + public void loadSpanReceivers(Configuration conf) { + Class implClass = null; + String[] receiverNames = conf.getTrimmedStrings(SPAN_RECEIVERS_CONF_KEY); + if (receiverNames == null || receiverNames.length == 0) { + return; + } + for (String className : receiverNames) { + className = className.trim(); + try { + implClass = Class.forName(className); + receivers.add(loadInstance(implClass, conf)); + LOG.info("SpanReceiver " + className + " was loaded successfully."); + } catch (ClassNotFoundException e) { + LOG.warn("Class " + className + " cannot be found.", e); + } catch (IOException e) { + LOG.warn("Load SpanReceiver " + className + " failed.", e); + } + } + for (SpanReceiver rcvr : receivers) { + Trace.addReceiver(rcvr); + } + } + + private SpanReceiver loadInstance(Class implClass, Configuration conf) + throws IOException { + SpanReceiver impl; + try { + Object o = ReflectionUtils.newInstance(implClass, conf); + impl = (SpanReceiver)o; + impl.configure(wrapHadoopConf(conf)); + } catch (SecurityException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (RuntimeException e) { + throw new IOException(e); + } + + return impl; + } + + private static HTraceConfiguration wrapHadoopConf(final Configuration conf) { + return new HTraceConfiguration() { + public static final String HTRACE_CONF_PREFIX = "hadoop."; + + @Override + public String get(String key) { + return conf.get(HTRACE_CONF_PREFIX + key); + } + + @Override + public String get(String key, String defaultValue) { + return conf.get(HTRACE_CONF_PREFIX + key, defaultValue); + } + }; + } + + /** + * Calls close() on all SpanReceivers created by this SpanReceiverHost. + */ + public synchronized void closeReceivers() { + if (closed) return; + closed = true; + for (SpanReceiver rcvr : receivers) { + try { + rcvr.close(); + } catch (IOException e) { + LOG.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 79f8692842d..36b5ff11bc8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -27,6 +27,8 @@ import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformation import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; +import org.htrace.Span; +import org.htrace.Trace; import com.google.protobuf.ByteString; @@ -165,6 +167,15 @@ public abstract class ProtoUtil { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); + + // Add tracing info if we are currently tracing. + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + result.setTraceInfo(RPCTraceInfoProto.newBuilder() + .setParentId(s.getSpanId()) + .setTraceId(s.getTraceId()).build()); + } + return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index e8c4adac367..c8791508b5a 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -53,6 +53,18 @@ enum RpcKindProto { +/** + * Used to pass through the information necessary to continue + * a trace after an RPC is made. All we need is the traceid + * (so we know the overarching trace this message is a part of), and + * the id of the current span when this message was sent, so we know + * what span caused the new span we will create when this message is received. + */ +message RPCTraceInfoProto { + optional int64 traceId = 1; + optional int64 parentId = 2; +} + message RpcRequestHeaderProto { // the header for the RpcRequest enum OperationProto { RPC_FINAL_PACKET = 0; // The final RPC Packet @@ -67,6 +79,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest // clientId + callId uniquely identifies a request // retry count, 1 means this is the first retry optional sint32 retryCount = 5 [default = -1]; + optional RPCTraceInfoProto traceInfo = 6; // tracing info } diff --git a/hadoop-common-project/hadoop-common/src/site/apt/Tracing.apt.vm b/hadoop-common-project/hadoop-common/src/site/apt/Tracing.apt.vm new file mode 100644 index 00000000000..f777dd23c16 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/apt/Tracing.apt.vm @@ -0,0 +1,169 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Distributed File System-${project.version} - Enabling Dapper-like Tracing + --- + --- + ${maven.build.timestamp} + +Enabling Dapper-like Tracing in HDFS + +%{toc|section=1|fromDepth=0} + +* {Dapper-like Tracing in HDFS} + +** HTrace + + {{{https://issues.apache.org/jira/browse/HDFS-5274}HDFS-5274}} + added support for tracing requests through HDFS, + using the open source tracing library, {{{https://github.com/cloudera/htrace}HTrace}}. + Setting up tracing is quite simple, however it requires some very minor changes to your client code. + +** SpanReceivers + + The tracing system works by collecting information in structs called 'Spans'. + It is up to you to choose how you want to receive this information + by implementing the SpanReceiver interface, which defines one method: + ++---- +public void receiveSpan(Span span); ++---- + + Configure what SpanReceivers you'd like to use + by putting a comma separated list of the fully-qualified class name of + classes implementing SpanReceiver + in <<>> property: <<>>. + ++---- + + hadoop.trace.spanreceiver.classes + org.htrace.impl.LocalFileSpanReceiver + + + hadoop.local-file-span-receiver.path + /var/log/hadoop/htrace.out + ++---- + +** Setting up ZipkinSpanReceiver + + Instead of implementing SpanReceiver by yourself, + you can use <<>> which uses + {{{https://github.com/twitter/zipkin}Zipkin}} + for collecting and dispalying tracing data. + + In order to use <<>>, + you need to download and setup {{{https://github.com/twitter/zipkin}Zipkin}} first. + + you also need to add the jar of <<>> to the classpath of Hadoop on each node. + Here is example setup procedure. + ++---- + $ git clone https://github.com/cloudera/htrace + $ cd htrace/htrace-zipkin + $ mvn compile assembly:single + $ cp target/htrace-zipkin-*-jar-with-dependencies.jar $HADOOP_HOME/share/hadoop/hdfs/lib/ ++---- + + The sample configuration for <<>> is shown below. + By adding these to <<>> of NameNode and DataNodes, + <<>> is initialized on the startup. + You also need this configuration on the client node in addition to the servers. + ++---- + + hadoop.trace.spanreceiver.classes + org.htrace.impl.ZipkinSpanReceiver + + + hadoop.zipkin.collector-hostname + 192.168.1.2 + + + hadoop.zipkin.collector-port + 9410 + ++---- + +** Turning on tracing by HTrace API + + In order to turn on Dapper-like tracing, + you will need to wrap the traced logic with <> as shown below. + When there is running tracing spans, + the tracing information is propagated to servers along with RPC requests. + + In addition, you need to initialize <<>> once per process. + ++---- +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.tracing.SpanReceiverHost; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; + +... + + SpanReceiverHost.getInstance(new HdfsConfiguration()); + +... + + TraceScope ts = Trace.startSpan("Gets", Sampler.ALWAYS); + try { + ... // traced logic + } finally { + if (ts != null) ts.close(); + } ++---- + +** Sample code for tracing + + The <<>> shown below is the wrapper of FsShell + which start tracing span before invoking HDFS shell command. + ++---- +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.tracing.SpanReceiverHost; +import org.apache.hadoop.util.ToolRunner; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; + +public class TracingFsShell { + public static void main(String argv[]) throws Exception { + Configuration conf = new Configuration(); + FsShell shell = new FsShell(); + conf.setQuietMode(false); + shell.setConf(conf); + int res = 0; + SpanReceiverHost.init(new HdfsConfiguration()); + TraceScope ts = null; + try { + ts = Trace.startSpan("FsShell", Sampler.ALWAYS); + res = ToolRunner.run(shell, argv); + } finally { + shell.close(); + if (ts != null) ts.close(); + } + System.exit(res); + } +} ++---- + + You can compile and execute this code as shown below. + ++---- +$ javac -cp `hadoop classpath` TracingFsShell.java +$ HADOOP_CLASSPATH=. hdfs TracingFsShell -put sample.txt /tmp/ ++---- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 9b026f2bdb5..81eae0ab510 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -181,6 +181,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> xercesImpl compile + + org.htrace + htrace-core + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index df50eabacb7..1ec91d005b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -180,6 +180,7 @@ import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.tracing.SpanReceiverHost; import org.mortbay.util.ajax.JSON; import com.google.common.annotations.VisibleForTesting; @@ -326,6 +327,8 @@ public class DataNode extends Configured private boolean isPermissionEnabled; private String dnUserName = null; + private SpanReceiverHost spanReceiverHost; + /** * Create the DataNode given a configuration, an array of dataDirs, * and a namenode proxy @@ -823,6 +826,7 @@ public class DataNode extends Configured this.dataDirs = dataDirs; this.conf = conf; this.dnConf = new DNConf(conf); + this.spanReceiverHost = SpanReceiverHost.getInstance(conf); if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { @@ -1510,6 +1514,9 @@ public class DataNode extends Configured MBeans.unregister(dataNodeInfoBeanName); dataNodeInfoBeanName = null; } + if (this.spanReceiverHost != null) { + this.spanReceiverHost.closeReceivers(); + } if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown(); LOG.info("Shutdown complete."); synchronized(this) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 4072b1720d7..bcb5a8697d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -60,6 +60,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.tracing.SpanReceiverHost; import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.JvmPauseMonitor; @@ -278,6 +279,7 @@ public class NameNode implements NameNodeStatusMXBean { private JvmPauseMonitor pauseMonitor; private ObjectName nameNodeStatusBeanName; + private SpanReceiverHost spanReceiverHost; /** * The namenode address that clients will use to access this namenode * or the name service. For HA configurations using logical URI, it @@ -586,6 +588,9 @@ public class NameNode implements NameNodeStatusMXBean { if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); } + + this.spanReceiverHost = SpanReceiverHost.getInstance(conf); + loadNamesystem(conf); rpcServer = createRpcServer(conf); @@ -822,6 +827,9 @@ public class NameNode implements NameNodeStatusMXBean { MBeans.unregister(nameNodeStatusBeanName); nameNodeStatusBeanName = null; } + if (this.spanReceiverHost != null) { + this.spanReceiverHost.closeReceivers(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java new file mode 100644 index 00000000000..bb923a2c6be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.htrace.HTraceConfiguration; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.SpanReceiver; +import org.htrace.Trace; +import org.htrace.TraceScope; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestTracing { + + private static Configuration conf; + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + + @Test + public void testSpanReceiverHost() throws Exception { + Configuration conf = new Configuration(); + conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, + SetSpanReceiver.class.getName()); + SpanReceiverHost spanReceiverHost = SpanReceiverHost.getInstance(conf); + } + + @Test + public void testWriteTraceHooks() throws Exception { + long startTime = System.currentTimeMillis(); + TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS); + Path file = new Path("traceWriteTest.dat"); + FSDataOutputStream stream = dfs.create(file); + + for (int i = 0; i < 10; i++) { + byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes(); + stream.write(data); + } + stream.hflush(); + stream.close(); + long endTime = System.currentTimeMillis(); + ts.close(); + + String[] expectedSpanNames = { + "testWriteTraceHooks", + "org.apache.hadoop.hdfs.protocol.ClientProtocol.create", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create", + "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete" + }; + assertSpanNamesFound(expectedSpanNames); + + // The trace should last about the same amount of time as the test + Map> map = SetSpanReceiver.SetHolder.getMap(); + Span s = map.get("testWriteTraceHooks").get(0); + Assert.assertNotNull(s); + long spanStart = s.getStartTimeMillis(); + long spanEnd = s.getStopTimeMillis(); + Assert.assertTrue(spanStart - startTime < 100); + Assert.assertTrue(spanEnd - endTime < 100); + + // There should only be one trace id as it should all be homed in the + // top trace. + for (Span span : SetSpanReceiver.SetHolder.spans) { + Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); + } + } + + @Test + public void testWriteWithoutTraceHooks() throws Exception { + Path file = new Path("withoutTraceWriteTest.dat"); + FSDataOutputStream stream = dfs.create(file); + for (int i = 0; i < 10; i++) { + byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes(); + stream.write(data); + } + stream.hflush(); + stream.close(); + Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0); + } + + @Test + public void testReadTraceHooks() throws Exception { + String fileName = "traceReadTest.dat"; + Path filePath = new Path(fileName); + + // Create the file. + FSDataOutputStream ostream = dfs.create(filePath); + for (int i = 0; i < 50; i++) { + byte[] data = RandomStringUtils.randomAlphabetic(10240).getBytes(); + ostream.write(data); + } + ostream.close(); + + + long startTime = System.currentTimeMillis(); + TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS); + FSDataInputStream istream = dfs.open(filePath, 10240); + ByteBuffer buf = ByteBuffer.allocate(10240); + + int count = 0; + try { + while (istream.read(buf) > 0) { + count += 1; + buf.clear(); + istream.seek(istream.getPos() + 5); + } + } catch (IOException ioe) { + // Ignore this it's probably a seek after eof. + } finally { + istream.close(); + } + ts.getSpan().addTimelineAnnotation("count: " + count); + long endTime = System.currentTimeMillis(); + ts.close(); + + String[] expectedSpanNames = { + "testReadTraceHooks", + "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations" + }; + assertSpanNamesFound(expectedSpanNames); + + // The trace should last about the same amount of time as the test + Map> map = SetSpanReceiver.SetHolder.getMap(); + Span s = map.get("testReadTraceHooks").get(0); + Assert.assertNotNull(s); + + long spanStart = s.getStartTimeMillis(); + long spanEnd = s.getStopTimeMillis(); + Assert.assertTrue(spanStart - startTime < 100); + Assert.assertTrue(spanEnd - endTime < 100); + + // There should only be one trace id as it should all be homed in the + // top trace. + for (Span span : SetSpanReceiver.SetHolder.spans) { + Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); + } + } + + @Test + public void testReadWithoutTraceHooks() throws Exception { + String fileName = "withoutTraceReadTest.dat"; + Path filePath = new Path(fileName); + + // Create the file. + FSDataOutputStream ostream = dfs.create(filePath); + for (int i = 0; i < 50; i++) { + byte[] data = RandomStringUtils.randomAlphabetic(10240).getBytes(); + ostream.write(data); + } + ostream.close(); + + FSDataInputStream istream = dfs.open(filePath, 10240); + ByteBuffer buf = ByteBuffer.allocate(10240); + + int count = 0; + try { + while (istream.read(buf) > 0) { + count += 1; + buf.clear(); + istream.seek(istream.getPos() + 5); + } + } catch (IOException ioe) { + // Ignore this it's probably a seek after eof. + } finally { + istream.close(); + } + Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0); + } + + @Before + public void cleanSet() { + SetSpanReceiver.SetHolder.spans.clear(); + } + + @BeforeClass + public static void setupCluster() throws IOException { + conf = new Configuration(); + conf.setLong("dfs.blocksize", 100 * 1024); + conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, + SetSpanReceiver.class.getName()); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3) + .build(); + + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void shutDown() throws IOException { + cluster.shutdown(); + } + + private void assertSpanNamesFound(String[] expectedSpanNames) { + Map> map = SetSpanReceiver.SetHolder.getMap(); + for (String spanName : expectedSpanNames) { + Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null); + } + } + + /** + * Span receiver that puts all spans into a single set. + * This is useful for testing. + *

+ * We're not using HTrace's POJOReceiver here so as that doesn't + * push all the metrics to a static place, and would make testing + * SpanReceiverHost harder. + */ + public static class SetSpanReceiver implements SpanReceiver { + + public void configure(HTraceConfiguration conf) { + } + + public void receiveSpan(Span span) { + SetHolder.spans.add(span); + } + + public void close() { + } + + public static class SetHolder { + public static Set spans = new HashSet(); + + public static int size() { + return spans.size(); + } + + public static Map> getMap() { + Map> map = new HashMap>(); + + for (Span s : spans) { + List l = map.get(s.getDescription()); + if (l == null) { + l = new LinkedList(); + map.put(s.getDescription(), l); + } + l.add(s); + } + return map; + } + } + } +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index be5b3d51b59..beaeec63a77 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -677,6 +677,11 @@ jsch 0.1.42 + + org.htrace + htrace-core + 3.0.4 + org.jdom jdom diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 56288ee60ca..a42aff0a382 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -65,6 +65,7 @@ +