HBASE-26521 Name RPC spans as `$package.$service/$method` (#4024)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2022-02-09 15:22:31 +01:00 committed by Nick Dimiduk
parent 2d25613e34
commit 600a6a8faf
10 changed files with 397 additions and 85 deletions

View File

@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
@ -91,15 +92,32 @@ public class ConnectionSpanBuilder implements Supplier<Span> {
* Static utility method that performs the primary logic of this builder. It is visible to other
* classes in this package so that other builders can use this functionality as a mix-in.
* @param attributes the attributes map to be populated.
* @param conn the source of attribute values.
* @param conn the source of connection attribute values.
*/
static void populateConnectionAttributes(
final Map<AttributeKey<?>, Object> attributes,
final AsyncConnectionImpl conn
) {
final Supplier<String> connStringSupplier = () -> conn.getConnectionRegistry()
.getConnectionString();
populateConnectionAttributes(attributes, connStringSupplier, conn::getUser);
}
/**
* Static utility method that performs the primary logic of this builder. It is visible to other
* classes in this package so that other builders can use this functionality as a mix-in.
* @param attributes the attributes map to be populated.
* @param connectionStringSupplier the source of the {@code db.connection_string} attribute value.
* @param userSupplier the source of the {@code db.user} attribute value.
*/
static void populateConnectionAttributes(
final Map<AttributeKey<?>, Object> attributes,
final Supplier<String> connectionStringSupplier,
final Supplier<User> userSupplier
) {
attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
attributes.put(DB_CONNECTION_STRING, connectionStringSupplier.get());
attributes.put(DB_USER, Optional.ofNullable(userSupplier.get())
.map(Object::toString)
.orElse(null));
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.trace;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_PORT;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
/**
* Construct {@link Span} instances originating from the client side of an IPC.
*
* @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
*/
@InterfaceAudience.Private
public class IpcClientSpanBuilder implements Supplier<Span> {
private String name;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
@Override
public Span get() {
return build();
}
public IpcClientSpanBuilder setMethodDescriptor(final Descriptors.MethodDescriptor md) {
final String packageAndService = getRpcPackageAndService(md.getService());
final String method = getRpcName(md);
this.name = buildSpanName(packageAndService, method);
populateMethodDescriptorAttributes(attributes, md);
return this;
}
public IpcClientSpanBuilder setRemoteAddress(final Address remoteAddress) {
attributes.put(NET_PEER_NAME, remoteAddress.getHostName());
attributes.put(NET_PEER_PORT, (long) remoteAddress.getPort());
return this;
}
@SuppressWarnings("unchecked")
public Span build() {
final SpanBuilder builder = TraceUtil.getGlobalTracer()
.spanBuilder(name)
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
.setSpanKind(SpanKind.CLIENT);
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
return builder.startSpan();
}
/**
* Static utility method that performs the primary logic of this builder. It is visible to other
* classes in this package so that other builders can use this functionality as a mix-in.
* @param attributes the attributes map to be populated.
* @param md the source of the RPC attribute values.
*/
static void populateMethodDescriptorAttributes(
final Map<AttributeKey<?>, Object> attributes,
final Descriptors.MethodDescriptor md
) {
final String packageAndService = getRpcPackageAndService(md.getService());
final String method = getRpcName(md);
attributes.put(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
attributes.put(RPC_SERVICE, packageAndService);
attributes.put(RPC_METHOD, method);
}
/**
* Retrieve the combined {@code $package.$service} value from {@code sd}.
*/
public static String getRpcPackageAndService(final Descriptors.ServiceDescriptor sd) {
// it happens that `getFullName` returns a string in the $package.$service format required by
// the otel RPC specification. Use it for now; might have to parse the value in the future.
return sd.getFullName();
}
/**
* Retrieve the {@code $method} value from {@code md}.
*/
public static String getRpcName(final Descriptors.MethodDescriptor md) {
return md.getName();
}
/**
* Construct an RPC span name.
*/
public static String buildSpanName(final String packageAndService, final String method) {
return packageAndService + "/" + method;
}
}

View File

@ -20,10 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_HOST_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_PORT_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
@ -40,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
@ -399,11 +396,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) {
Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
.setAttribute(RPC_SERVICE_KEY, md.getService().getName())
.setAttribute(RPC_METHOD_KEY, md.getName())
.setAttribute(REMOTE_HOST_KEY, addr.getHostName())
.setAttribute(REMOTE_PORT_KEY, addr.getPort());
Span span = new IpcClientSpanBuilder()
.setMethodDescriptor(md)
.setRemoteAddress(addr)
.build();
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

View File

@ -24,6 +24,7 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.time.Duration;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
@ -46,6 +47,16 @@ public final class SpanDataMatchers {
};
}
public static Matcher<SpanData> hasDuration(Matcher<Duration> matcher) {
return new FeatureMatcher<SpanData, Duration>(
matcher, "SpanData having duration that ", "duration") {
@Override
protected Duration featureValueOf(SpanData item) {
return Duration.ofNanos(item.getEndEpochNanos() - item.getStartEpochNanos());
}
};
}
public static Matcher<SpanData> hasEnded() {
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
@ -92,4 +103,17 @@ public final class SpanDataMatchers {
}
};
}
public static Matcher<SpanData> hasTraceId(String traceId) {
return hasTraceId(is(equalTo(traceId)));
}
public static Matcher<SpanData> hasTraceId(Matcher<String> matcher) {
return new FeatureMatcher<SpanData, String>(
matcher, "SpanData with a traceId that ", "traceId") {
@Override protected String featureValueOf(SpanData item) {
return item.getTraceId();
}
};
}
}

View File

@ -44,14 +44,13 @@ public final class HBaseSemanticAttributes {
AttributeKey.stringArrayKey("db.hbase.container_operations");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
public static final AttributeKey<String> RPC_SERVICE_KEY =
AttributeKey.stringKey("db.hbase.rpc.service");
public static final AttributeKey<String> RPC_METHOD_KEY =
AttributeKey.stringKey("db.hbase.rpc.method");
public static final AttributeKey<String> RPC_SYSTEM = SemanticAttributes.RPC_SYSTEM;
public static final AttributeKey<String> RPC_SERVICE = SemanticAttributes.RPC_SERVICE;
public static final AttributeKey<String> RPC_METHOD = SemanticAttributes.RPC_METHOD;
public static final AttributeKey<String> SERVER_NAME_KEY =
AttributeKey.stringKey("db.hbase.server.name");
public static final AttributeKey<String> REMOTE_HOST_KEY = SemanticAttributes.NET_PEER_NAME;
public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;
public static final AttributeKey<String> NET_PEER_NAME = SemanticAttributes.NET_PEER_NAME;
public static final AttributeKey<Long> NET_PEER_PORT = SemanticAttributes.NET_PEER_PORT;
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
@ -74,5 +73,13 @@ public final class HBaseSemanticAttributes {
SCAN,
}
/**
* These are values used with {@link #RPC_SYSTEM}. Only a single value for now; more to come as
* we add tracing over our gateway components.
*/
public enum RpcSystem {
HBASE_RPC,
}
private HBaseSemanticAttributes() { }
}

View File

@ -255,6 +255,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-http</artifactId>

View File

@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
@ -32,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@ -90,14 +88,6 @@ public class CallRunner {
this.rpcServer = null;
}
private String getServiceName() {
return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
}
private String getMethodName() {
return call.getMethod() != null ? call.getMethod().getName() : "";
}
public void run() {
try {
if (call.disconnectSince() >= 0) {
@ -122,12 +112,7 @@ public class CallRunner {
String error = null;
Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call);
String serviceName = getServiceName();
String methodName = getMethodName();
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan()
.setAttribute(RPC_SERVICE_KEY, serviceName)
.setAttribute(RPC_METHOD_KEY, methodName);
Span span = new IpcServerSpanBuilder(call).build();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.trace;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
/**
* Construct {@link Span} instances originating from the server side of an IPC.
*
* @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
*/
@InterfaceAudience.Private
public class IpcServerSpanBuilder implements Supplier<Span> {
private final RpcCall rpcCall;
private String name;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
public IpcServerSpanBuilder(final RpcCall rpcCall) {
this.rpcCall = rpcCall;
final String packageAndService = Optional.ofNullable(rpcCall.getService())
.map(BlockingService::getDescriptorForType)
.map(IpcClientSpanBuilder::getRpcPackageAndService)
.orElse("");
final String method = Optional.ofNullable(rpcCall.getMethod())
.map(IpcClientSpanBuilder::getRpcName)
.orElse("");
setName(IpcClientSpanBuilder.buildSpanName(packageAndService, method));
addAttribute(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
addAttribute(RPC_SERVICE, packageAndService);
addAttribute(RPC_METHOD, method);
}
@Override
public Span get() {
return build();
}
public IpcServerSpanBuilder setName(final String name) {
this.name = name;
return this;
}
public <T> IpcServerSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
attributes.put(key, value);
return this;
}
@SuppressWarnings("unchecked")
public Span build() {
final SpanBuilder builder = TraceUtil.getGlobalTracer()
.spanBuilder(name)
.setSpanKind(SpanKind.SERVER);
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
return builder.setParent(Context.current().with(((ServerCall<?>) rpcCall).getSpan()))
.startSpan();
}
}

View File

@ -17,11 +17,21 @@
*/
package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -33,16 +43,16 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -50,22 +60,21 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.hamcrest.Matcher;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -442,74 +451,124 @@ public abstract class AbstractTestIPC {
}
}
private void assertSameTraceId() {
String traceId = traceRule.getSpans().get(0).getTraceId();
for (SpanData data : traceRule.getSpans()) {
// assert we are the same trace
assertEquals(traceId, data.getTraceId());
}
private SpanData waitSpan(Matcher<SpanData> matcher) {
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
() -> traceRule.getSpans(), hasItem(matcher)));
return traceRule.getSpans()
.stream()
.filter(matcher::matches)
.findFirst()
.orElseThrow(AssertionError::new);
}
private SpanData waitSpan(String name) {
Waiter.waitFor(CONF, 1000,
() -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
private static String buildIpcSpanName(final String packageAndService, final String methodName) {
return packageAndService + "/" + methodName;
}
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
SpanKind kind) {
assertEquals(SERVICE.getDescriptorForType().getName(),
data.getAttributes().get(HBaseSemanticAttributes.RPC_SERVICE_KEY));
assertEquals(methodName, data.getAttributes().get(HBaseSemanticAttributes.RPC_METHOD_KEY));
if (addr != null) {
assertEquals(
addr.getHostName(),
data.getAttributes().get(HBaseSemanticAttributes.REMOTE_HOST_KEY));
assertEquals(
addr.getPort(),
data.getAttributes().get(HBaseSemanticAttributes.REMOTE_PORT_KEY).intValue());
}
assertEquals(kind, data.getKind());
private static Matcher<SpanData> buildIpcClientSpanMatcher(
final String packageAndService,
final String methodName
) {
return allOf(
hasName(buildIpcSpanName(packageAndService, methodName)),
hasKind(SpanKind.CLIENT)
);
}
private static Matcher<SpanData> buildIpcServerSpanMatcher(
final String packageAndService,
final String methodName
) {
return allOf(
hasName(buildIpcSpanName(packageAndService, methodName)),
hasKind(SpanKind.SERVER)
);
}
private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
final String packageAndService,
final String methodName,
final InetSocketAddress isa
) {
return hasAttributes(allOf(
containsEntry("rpc.system", "HBASE_RPC"),
containsEntry("rpc.service", packageAndService),
containsEntry("rpc.method", methodName),
containsEntry("net.peer.name", isa.getHostName()),
containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
}
private static Matcher<SpanData> buildIpcServerSpanAttributesMatcher(
final String packageAndService,
final String methodName
) {
return hasAttributes(allOf(
containsEntry("rpc.system", "HBASE_RPC"),
containsEntry("rpc.service", packageAndService),
containsEntry("rpc.method", methodName)));
}
private void assertRemoteSpan() {
SpanData data = waitSpan("RpcServer.process");
SpanData data = waitSpan(hasName("RpcServer.process"));
assertTrue(data.getParentSpanContext().isRemote());
assertEquals(SpanKind.SERVER, data.getKind());
}
@Test
public void testTracing() throws IOException, ServiceException {
public void testTracingSuccessIpc() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
SpanKind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, SpanKind.INTERNAL);
// use the ISA from the running server so that we can get the port selected.
final InetSocketAddress isa = rpcServer.getListenerAddress();
final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause", isa));
final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "pause"));
assertRemoteSpan();
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertThat(
TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()),
greaterThanOrEqualTo(100L));
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
}
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(), everyItem(allOf(
hasStatusWithCode(StatusCode.OK),
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
}
}
traceRule.clearSpans();
@Test
public void testTracingErrorIpc() throws IOException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
// use the ISA from the running server so that we can get the port selected.
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
SpanKind.CLIENT);
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, SpanKind.INTERNAL);
final InetSocketAddress isa = rpcServer.getListenerAddress();
final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error", isa));
final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher(
"hbase.test.pb.TestProtobufRpcProto", "error"));
assertRemoteSpan();
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
}
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
assertThat(traceRule.getSpans(), everyItem(allOf(
hasStatusWithCode(StatusCode.ERROR),
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
}
}
}

View File

@ -1846,6 +1846,13 @@
<groupId>org.apache.hbase</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>hbase-client</artifactId>
<groupId>org.apache.hbase</groupId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>hbase-metrics-api</artifactId>
<groupId>org.apache.hbase</groupId>