From 8b7450f5d8480e0243978b279eaaba5fb1c4994b Mon Sep 17 00:00:00 2001 From: "Tak Lon (Stephen) Wu" Date: Thu, 19 Aug 2021 20:12:37 -0700 Subject: [PATCH] HBASE-26132 Backport HBASE-25535 "Set span kind to CLIENT in AbstractRpcClient" to branch-2 (#3607) 9/17 commits of HBASE-22120, original commit bb8c4967f8ce2c89ebaf1ddc5d8a1bf55f1e20d3 Co-authored-by: Duo Zhang Signed-off-by: Duo Zhang --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 2 +- .../apache/hadoop/hbase/trace/TraceUtil.java | 7 ++++++ .../hadoop/hbase/ipc/AbstractTestIPC.java | 23 +++++++++++++++---- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index fa7dfb135ea..bb7f4af5737 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -395,7 +395,7 @@ public abstract class AbstractRpcClient implements RpcC private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, final Address addr, final RpcCallback callback) { - Span span = TraceUtil.createSpan("RpcClient.callMethod") + Span span = TraceUtil.createClientSpan("RpcClient.callMethod") .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName()) .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName()) .setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName()) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 886a4a9423d..2a6b6b6588f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -105,6 +105,13 @@ public final class TraceUtil { return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan(); } + /** + * Create a span with {@link Kind#CLIENT}. + */ + public static Span createClientSpan(String name) { + return createSpan(name, Kind.CLIENT); + } + /** * Trace an asynchronous operation for a table. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index ffecb094dcf..c4951b29f98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; +import io.opentelemetry.api.trace.Span.Kind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; @@ -455,7 +456,8 @@ public abstract class AbstractTestIPC { return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); } - private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr) { + private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr, + Kind kind) { assertEquals(SERVICE.getDescriptorForType().getName(), data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY)); assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY)); @@ -463,6 +465,13 @@ public abstract class AbstractTestIPC { assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY)); assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue()); } + assertEquals(kind, data.getKind()); + } + + private void assertRemoteSpan() { + SpanData data = waitSpan("RpcServer.process"); + assertTrue(data.getParentSpanContext().isRemote()); + assertEquals(Kind.SERVER, data.getKind()); } @Test @@ -474,8 +483,10 @@ public abstract class AbstractTestIPC { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); - assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress()); - assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null); + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(), + Kind.CLIENT); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, Kind.INTERNAL); + assertRemoteSpan(); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertThat( @@ -487,8 +498,10 @@ public abstract class AbstractTestIPC { traceRule.clearSpans(); assertThrows(ServiceException.class, () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); - assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress()); - assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null); + assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(), + Kind.CLIENT); + assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, Kind.INTERNAL); + assertRemoteSpan(); assertSameTraceId(); for (SpanData data : traceRule.getSpans()) { assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());