HBASE-25481 Add host and port attribute when tracing rpc call at client side (#2857)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
dcb78bd4bd
commit
ae2c62ffaa
|
@ -397,7 +397,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
||||||
final RpcCallback<Message> callback) {
|
final RpcCallback<Message> callback) {
|
||||||
Span span = TraceUtil.createSpan("RpcClient.callMethod")
|
Span span = TraceUtil.createSpan("RpcClient.callMethod")
|
||||||
.setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
|
.setAttribute(TraceUtil.RPC_SERVICE_KEY, md.getService().getName())
|
||||||
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName());
|
.setAttribute(TraceUtil.RPC_METHOD_KEY, md.getName())
|
||||||
|
.setAttribute(TraceUtil.REMOTE_HOST_KEY, addr.getHostName())
|
||||||
|
.setAttribute(TraceUtil.REMOTE_PORT_KEY, addr.getPort());
|
||||||
try (Scope scope = span.makeCurrent()) {
|
try (Scope scope = span.makeCurrent()) {
|
||||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
||||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
|
|
@ -54,6 +54,10 @@ public final class TraceUtil {
|
||||||
public static final AttributeKey<String> SERVER_NAME_KEY =
|
public static final AttributeKey<String> SERVER_NAME_KEY =
|
||||||
AttributeKey.stringKey("db.hbase.server.name");
|
AttributeKey.stringKey("db.hbase.server.name");
|
||||||
|
|
||||||
|
public static final AttributeKey<String> REMOTE_HOST_KEY = SemanticAttributes.NET_PEER_NAME;
|
||||||
|
|
||||||
|
public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;
|
||||||
|
|
||||||
private TraceUtil() {
|
private TraceUtil() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -455,11 +455,14 @@ public abstract class AbstractTestIPC {
|
||||||
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
|
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertRpcAttribute(SpanData data, String methodName) {
|
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr) {
|
||||||
assertEquals(SERVICE.getDescriptorForType().getName(),
|
assertEquals(SERVICE.getDescriptorForType().getName(),
|
||||||
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
|
data.getAttributes().get(TraceUtil.RPC_SERVICE_KEY));
|
||||||
assertEquals(methodName,
|
assertEquals(methodName, data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
|
||||||
data.getAttributes().get(TraceUtil.RPC_METHOD_KEY));
|
if (addr != null) {
|
||||||
|
assertEquals(addr.getHostName(), data.getAttributes().get(TraceUtil.REMOTE_HOST_KEY));
|
||||||
|
assertEquals(addr.getPort(), data.getAttributes().get(TraceUtil.REMOTE_PORT_KEY).intValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -471,8 +474,8 @@ public abstract class AbstractTestIPC {
|
||||||
rpcServer.start();
|
rpcServer.start();
|
||||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||||
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
|
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
|
||||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause");
|
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress());
|
||||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause");
|
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null);
|
||||||
assertSameTraceId();
|
assertSameTraceId();
|
||||||
for (SpanData data : traceRule.getSpans()) {
|
for (SpanData data : traceRule.getSpans()) {
|
||||||
assertThat(
|
assertThat(
|
||||||
|
@ -484,8 +487,8 @@ public abstract class AbstractTestIPC {
|
||||||
traceRule.clearSpans();
|
traceRule.clearSpans();
|
||||||
assertThrows(ServiceException.class,
|
assertThrows(ServiceException.class,
|
||||||
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
|
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
|
||||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error");
|
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress());
|
||||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error");
|
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null);
|
||||||
assertSameTraceId();
|
assertSameTraceId();
|
||||||
for (SpanData data : traceRule.getSpans()) {
|
for (SpanData data : traceRule.getSpans()) {
|
||||||
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
|
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
|
||||||
|
|
Loading…
Reference in New Issue