HBASE-26132 Backport HBASE-25535 "Set span kind to CLIENT in AbstractRpcClient" to branch-2 (#3607)

9/17 commits of HBASE-22120, original commit bb8c4967f8

Co-authored-by: Duo Zhang <zhangduo@apache.org>

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Tak Lon (Stephen) Wu 2021-08-19 20:12:37 -07:00
parent 9cce94a2d0
commit 8b7450f5d8
3 changed files with 26 additions and 6 deletions

View File

@ -395,7 +395,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Address addr, final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) { final RpcCallback<Message> callback) {
Span span = TraceUtil.createSpan("RpcClient.callMethod") Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
.setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName()) .setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName()) .setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName())
.setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName()) .setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName())

View File

@ -105,6 +105,13 @@ public final class TraceUtil {
return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan(); return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(Kind.SERVER).startSpan();
} }
/**
* Create a span with {@link Kind#CLIENT}.
*/
public static Span createClientSpan(String name) {
return createSpan(name, Kind.CLIENT);
}
/** /**
* Trace an asynchronous operation for a table. * Trace an asynchronous operation for a table.
*/ */

View File

@ -34,6 +34,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.mockito.internal.verification.VerificationModeFactory.times;
import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData;
@ -455,7 +456,8 @@ public abstract class AbstractTestIPC {
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get(); return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
} }
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr) { private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
Kind kind) {
assertEquals(SERVICE.getDescriptorForType().getName(), assertEquals(SERVICE.getDescriptorForType().getName(),
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY)); data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY)); assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
@ -463,6 +465,13 @@ public abstract class AbstractTestIPC {
assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY)); assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY));
assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue()); assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue());
} }
assertEquals(kind, data.getKind());
}
private void assertRemoteSpan() {
SpanData data = waitSpan("RpcServer.process");
assertTrue(data.getParentSpanContext().isRemote());
assertEquals(Kind.SERVER, data.getKind());
} }
@Test @Test
@ -474,8 +483,10 @@ public abstract class AbstractTestIPC {
rpcServer.start(); rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress()); assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null); Kind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, Kind.INTERNAL);
assertRemoteSpan();
assertSameTraceId(); assertSameTraceId();
for (SpanData data : traceRule.getSpans()) { for (SpanData data : traceRule.getSpans()) {
assertThat( assertThat(
@ -487,8 +498,10 @@ public abstract class AbstractTestIPC {
traceRule.clearSpans(); traceRule.clearSpans();
assertThrows(ServiceException.class, assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance())); () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress()); assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null); Kind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, Kind.INTERNAL);
assertRemoteSpan();
assertSameTraceId(); assertSameTraceId();
for (SpanData data : traceRule.getSpans()) { for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());