HBASE-25535 Set span kind to CLIENT in AbstractRpcClient (#2907)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
2be2c63f0d
commit
bb8c4967f8
|
@ -395,7 +395,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
|
||||
final Message param, Message returnType, final User ticket, final Address addr,
|
||||
final RpcCallback<Message> callback) {
|
||||
Span span = TraceUtil.createSpan("RpcClient.callMethod")
|
||||
Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
|
||||
.setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
|
||||
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName())
|
||||
.setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName())
|
||||
|
|
|
@ -105,6 +105,13 @@ public final class TraceUtil {
|
|||
return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a span with {@link Kind#CLIENT}.
|
||||
*/
|
||||
public static Span createClientSpan(String name) {
|
||||
return createSpan(name, Kind.CLIENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace an asynchronous operation for a table.
|
||||
*/
|
||||
|
|
|
@ -34,6 +34,7 @@ import static org.mockito.Mockito.spy;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
|
||||
import io.opentelemetry.api.trace.Span.Kind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
|
@ -455,7 +456,8 @@ public abstract class AbstractTestIPC {
|
|||
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
|
||||
}
|
||||
|
||||
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr) {
|
||||
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
|
||||
Kind kind) {
|
||||
assertEquals(SERVICE.getDescriptorForType().getName(),
|
||||
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
|
||||
assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
|
||||
|
@ -463,6 +465,13 @@ public abstract class AbstractTestIPC {
|
|||
assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY));
|
||||
assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue());
|
||||
}
|
||||
assertEquals(kind, data.getKind());
|
||||
}
|
||||
|
||||
private void assertRemoteSpan() {
|
||||
SpanData data = waitSpan("RpcServer.process");
|
||||
assertTrue(data.getParentSpanContext().isRemote());
|
||||
assertEquals(Kind.SERVER, data.getKind());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -474,8 +483,10 @@ public abstract class AbstractTestIPC {
|
|||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress());
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null);
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
|
||||
Kind.CLIENT);
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, Kind.INTERNAL);
|
||||
assertRemoteSpan();
|
||||
assertSameTraceId();
|
||||
for (SpanData data : traceRule.getSpans()) {
|
||||
assertThat(
|
||||
|
@ -487,8 +498,10 @@ public abstract class AbstractTestIPC {
|
|||
traceRule.clearSpans();
|
||||
assertThrows(ServiceException.class,
|
||||
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress());
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null);
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
|
||||
Kind.CLIENT);
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, Kind.INTERNAL);
|
||||
assertRemoteSpan();
|
||||
assertSameTraceId();
|
||||
for (SpanData data : traceRule.getSpans()) {
|
||||
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
|
||||
|
|
Loading…
Reference in New Issue