HDDS-1214. Enable tracing for the datanode read/write path. Contributed by Elek, Marton.
Closes #550.
This commit is contained in:
parent
c7307867f0
commit
d17e31e062
|
@ -22,6 +22,7 @@ import java.lang.reflect.Proxy;
|
||||||
import io.jaegertracing.Configuration;
|
import io.jaegertracing.Configuration;
|
||||||
import io.jaegertracing.internal.JaegerTracer;
|
import io.jaegertracing.internal.JaegerTracer;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
|
import io.opentracing.Span;
|
||||||
import io.opentracing.SpanContext;
|
import io.opentracing.SpanContext;
|
||||||
import io.opentracing.Tracer;
|
import io.opentracing.Tracer;
|
||||||
import io.opentracing.util.GlobalTracer;
|
import io.opentracing.util.GlobalTracer;
|
||||||
|
@ -64,6 +65,19 @@ public final class TracingUtil {
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Export the specific span as a string.
|
||||||
|
*
|
||||||
|
* @return encoded tracing context.
|
||||||
|
*/
|
||||||
|
public static String exportSpan(Span span) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
if (span != null) {
|
||||||
|
GlobalTracer.get().inject(span.context(), StringCodec.FORMAT, builder);
|
||||||
|
}
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new scope and use the imported span as the parent.
|
* Create a new scope and use the imported span as the parent.
|
||||||
*
|
*
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.InvalidContainerStateException;
|
.InvalidContainerStateException;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ozone.audit.AuditAction;
|
import org.apache.hadoop.ozone.audit.AuditAction;
|
||||||
import org.apache.hadoop.ozone.audit.AuditEventStatus;
|
import org.apache.hadoop.ozone.audit.AuditEventStatus;
|
||||||
import org.apache.hadoop.ozone.audit.AuditLogger;
|
import org.apache.hadoop.ozone.audit.AuditLogger;
|
||||||
|
@ -61,6 +62,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||||
ContainerDataProto.State;
|
ContainerDataProto.State;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
|
|
||||||
|
import io.opentracing.Scope;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -137,10 +140,19 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
|
||||||
containerSet.buildMissingContainerSet(createdContainerSet);
|
containerSet.buildMissingContainerSet(createdContainerSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("methodlength")
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerCommandResponseProto dispatch(
|
public ContainerCommandResponseProto dispatch(
|
||||||
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
|
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
|
||||||
|
String spanName = "HddsDispatcher." + msg.getCmdType().name();
|
||||||
|
try (Scope scope = TracingUtil
|
||||||
|
.importAndCreateScope(spanName, msg.getTraceID())) {
|
||||||
|
return dispatchRequest(msg, dispatcherContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("methodlength")
|
||||||
|
private ContainerCommandResponseProto dispatchRequest(
|
||||||
|
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
|
||||||
Preconditions.checkNotNull(msg);
|
Preconditions.checkNotNull(msg);
|
||||||
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
|
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
|
||||||
msg.getTraceID());
|
msg.getTraceID());
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
|
.StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
.SCMConnectionManager;
|
.SCMConnectionManager;
|
||||||
|
@ -133,6 +134,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
|
||||||
final ContainerCommandRequestProto.Builder command =
|
final ContainerCommandRequestProto.Builder command =
|
||||||
ContainerCommandRequestProto.newBuilder();
|
ContainerCommandRequestProto.newBuilder();
|
||||||
command.setCmdType(ContainerProtos.Type.CloseContainer);
|
command.setCmdType(ContainerProtos.Type.CloseContainer);
|
||||||
|
command.setTraceID(TracingUtil.exportCurrentSpan());
|
||||||
command.setContainerID(containerId);
|
command.setContainerID(containerId);
|
||||||
command.setCloseContainer(
|
command.setCloseContainer(
|
||||||
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
|
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
|
||||||
|
|
|
@ -31,10 +31,12 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
import org.apache.hadoop.hdds.scm.container.common.helpers.
|
||||||
StorageContainerException;
|
StorageContainerException;
|
||||||
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
|
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
|
||||||
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
|
|
||||||
|
import io.opentracing.Scope;
|
||||||
import org.apache.ratis.thirdparty.io.grpc.BindableService;
|
import org.apache.ratis.thirdparty.io.grpc.BindableService;
|
||||||
import org.apache.ratis.thirdparty.io.grpc.Server;
|
import org.apache.ratis.thirdparty.io.grpc.Server;
|
||||||
import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
|
import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
|
||||||
|
@ -168,12 +170,18 @@ public final class XceiverServerGrpc extends XceiverServer {
|
||||||
@Override
|
@Override
|
||||||
public void submitRequest(ContainerCommandRequestProto request,
|
public void submitRequest(ContainerCommandRequestProto request,
|
||||||
HddsProtos.PipelineID pipelineID) throws IOException {
|
HddsProtos.PipelineID pipelineID) throws IOException {
|
||||||
super.submitRequest(request, pipelineID);
|
try (Scope scope = TracingUtil
|
||||||
ContainerProtos.ContainerCommandResponseProto response =
|
.importAndCreateScope(
|
||||||
storageContainer.dispatch(request, null);
|
"XceiverServerGrpc." + request.getCmdType().name(),
|
||||||
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
|
request.getTraceID())) {
|
||||||
throw new StorageContainerException(response.getMessage(),
|
|
||||||
response.getResult());
|
super.submitRequest(request, pipelineID);
|
||||||
|
ContainerProtos.ContainerCommandResponseProto response =
|
||||||
|
storageContainer.dispatch(request, null);
|
||||||
|
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
|
||||||
|
throw new StorageContainerException(response.getMessage(),
|
||||||
|
response.getResult());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hdds.HddsUtils;
|
import org.apache.hadoop.hdds.HddsUtils;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
|
||||||
import io.opentracing.Scope;
|
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
||||||
|
@ -51,7 +50,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ReadChunkRequestProto;
|
.ReadChunkRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ReadChunkResponseProto;
|
.ReadChunkResponseProto;
|
||||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
import org.apache.hadoop.hdds.security.token.TokenVerifier;
|
import org.apache.hadoop.hdds.security.token.TokenVerifier;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -271,51 +269,50 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
final ContainerCommandRequestProto proto =
|
final ContainerCommandRequestProto proto =
|
||||||
getContainerCommandRequestProto(request.getMessage().getContent());
|
getContainerCommandRequestProto(request.getMessage().getContent());
|
||||||
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
|
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
|
||||||
try (Scope scope = TracingUtil
|
try {
|
||||||
.importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) {
|
dispatcher.validateContainerCommand(proto);
|
||||||
try {
|
} catch (IOException ioe) {
|
||||||
dispatcher.validateContainerCommand(proto);
|
TransactionContext ctxt = TransactionContext.newBuilder()
|
||||||
} catch (IOException ioe) {
|
.setClientRequest(request)
|
||||||
TransactionContext ctxt = TransactionContext.newBuilder()
|
.setStateMachine(this)
|
||||||
.setClientRequest(request)
|
.setServerRole(RaftPeerRole.LEADER)
|
||||||
.setStateMachine(this)
|
.build();
|
||||||
.setServerRole(RaftPeerRole.LEADER)
|
ctxt.setException(ioe);
|
||||||
.build();
|
return ctxt;
|
||||||
ctxt.setException(ioe);
|
|
||||||
return ctxt;
|
|
||||||
}
|
|
||||||
if (proto.getCmdType() == Type.WriteChunk) {
|
|
||||||
final WriteChunkRequestProto write = proto.getWriteChunk();
|
|
||||||
// create the log entry proto
|
|
||||||
final WriteChunkRequestProto commitWriteChunkProto =
|
|
||||||
WriteChunkRequestProto.newBuilder()
|
|
||||||
.setBlockID(write.getBlockID())
|
|
||||||
.setChunkData(write.getChunkData())
|
|
||||||
// skipping the data field as it is
|
|
||||||
// already set in statemachine data proto
|
|
||||||
.build();
|
|
||||||
ContainerCommandRequestProto commitContainerCommandProto =
|
|
||||||
ContainerCommandRequestProto
|
|
||||||
.newBuilder(proto)
|
|
||||||
.setWriteChunk(commitWriteChunkProto)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
return TransactionContext.newBuilder()
|
|
||||||
.setClientRequest(request)
|
|
||||||
.setStateMachine(this)
|
|
||||||
.setServerRole(RaftPeerRole.LEADER)
|
|
||||||
.setStateMachineData(write.getData())
|
|
||||||
.setLogData(commitContainerCommandProto.toByteString())
|
|
||||||
.build();
|
|
||||||
} else {
|
|
||||||
return TransactionContext.newBuilder()
|
|
||||||
.setClientRequest(request)
|
|
||||||
.setStateMachine(this)
|
|
||||||
.setServerRole(RaftPeerRole.LEADER)
|
|
||||||
.setLogData(request.getMessage().getContent())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if (proto.getCmdType() == Type.WriteChunk) {
|
||||||
|
final WriteChunkRequestProto write = proto.getWriteChunk();
|
||||||
|
// create the log entry proto
|
||||||
|
final WriteChunkRequestProto commitWriteChunkProto =
|
||||||
|
WriteChunkRequestProto.newBuilder()
|
||||||
|
.setBlockID(write.getBlockID())
|
||||||
|
.setChunkData(write.getChunkData())
|
||||||
|
// skipping the data field as it is
|
||||||
|
// already set in statemachine data proto
|
||||||
|
.build();
|
||||||
|
ContainerCommandRequestProto commitContainerCommandProto =
|
||||||
|
ContainerCommandRequestProto
|
||||||
|
.newBuilder(proto)
|
||||||
|
.setWriteChunk(commitWriteChunkProto)
|
||||||
|
.setTraceID(proto.getTraceID())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return TransactionContext.newBuilder()
|
||||||
|
.setClientRequest(request)
|
||||||
|
.setStateMachine(this)
|
||||||
|
.setServerRole(RaftPeerRole.LEADER)
|
||||||
|
.setStateMachineData(write.getData())
|
||||||
|
.setLogData(commitContainerCommandProto.toByteString())
|
||||||
|
.build();
|
||||||
|
} else {
|
||||||
|
return TransactionContext.newBuilder()
|
||||||
|
.setClientRequest(request)
|
||||||
|
.setStateMachine(this)
|
||||||
|
.setServerRole(RaftPeerRole.LEADER)
|
||||||
|
.setLogData(request.getMessage().getContent())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) {
|
private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) {
|
||||||
|
|
|
@ -486,7 +486,8 @@ public final class XceiverServerRatis extends XceiverServer {
|
||||||
super.submitRequest(request, pipelineID);
|
super.submitRequest(request, pipelineID);
|
||||||
RaftClientReply reply;
|
RaftClientReply reply;
|
||||||
try (Scope scope = TracingUtil
|
try (Scope scope = TracingUtil
|
||||||
.importAndCreateScope(request.getCmdType().name(),
|
.importAndCreateScope(
|
||||||
|
"XceiverServerRatis." + request.getCmdType().name(),
|
||||||
request.getTraceID())) {
|
request.getTraceID())) {
|
||||||
|
|
||||||
RaftClientRequest raftClientRequest =
|
RaftClientRequest raftClientRequest =
|
||||||
|
|
Loading…
Reference in New Issue