HBASE-26521 Name RPC spans as `$package.$service/$method` (#4024)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
a890fada61
commit
d242c8fafa
|
@ -32,6 +32,7 @@ import java.util.Optional;
|
|||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
@ -91,15 +92,32 @@ public class ConnectionSpanBuilder implements Supplier<Span> {
|
|||
* Static utility method that performs the primary logic of this builder. It is visible to other
|
||||
* classes in this package so that other builders can use this functionality as a mix-in.
|
||||
* @param attributes the attributes map to be populated.
|
||||
* @param conn the source of attribute values.
|
||||
* @param conn the source of connection attribute values.
|
||||
*/
|
||||
static void populateConnectionAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final AsyncConnectionImpl conn
|
||||
) {
|
||||
final Supplier<String> connStringSupplier = () -> conn.getConnectionRegistry()
|
||||
.getConnectionString();
|
||||
populateConnectionAttributes(attributes, connStringSupplier, conn::getUser);
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method that performs the primary logic of this builder. It is visible to other
|
||||
* classes in this package so that other builders can use this functionality as a mix-in.
|
||||
* @param attributes the attributes map to be populated.
|
||||
* @param connectionStringSupplier the source of the {@code db.connection_string} attribute value.
|
||||
* @param userSupplier the source of the {@code db.user} attribute value.
|
||||
*/
|
||||
static void populateConnectionAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final Supplier<String> connectionStringSupplier,
|
||||
final Supplier<User> userSupplier
|
||||
) {
|
||||
attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
|
||||
attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
|
||||
attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
|
||||
attributes.put(DB_CONNECTION_STRING, connectionStringSupplier.get());
|
||||
attributes.put(DB_USER, Optional.ofNullable(userSupplier.get())
|
||||
.map(Object::toString)
|
||||
.orElse(null));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_NAME;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_PORT;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
|
||||
/**
|
||||
* Construct {@link Span} instances originating from the client side of an IPC.
|
||||
*
|
||||
* @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class IpcClientSpanBuilder implements Supplier<Span> {
|
||||
|
||||
private String name;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
public IpcClientSpanBuilder setMethodDescriptor(final Descriptors.MethodDescriptor md) {
|
||||
final String packageAndService = getRpcPackageAndService(md.getService());
|
||||
final String method = getRpcName(md);
|
||||
this.name = buildSpanName(packageAndService, method);
|
||||
populateMethodDescriptorAttributes(attributes, md);
|
||||
return this;
|
||||
}
|
||||
|
||||
public IpcClientSpanBuilder setRemoteAddress(final Address remoteAddress) {
|
||||
attributes.put(NET_PEER_NAME, remoteAddress.getHostName());
|
||||
attributes.put(NET_PEER_PORT, (long) remoteAddress.getPort());
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Span build() {
|
||||
final SpanBuilder builder = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder(name)
|
||||
// TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
|
||||
.setSpanKind(SpanKind.CLIENT);
|
||||
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
|
||||
return builder.startSpan();
|
||||
}
|
||||
|
||||
/**
|
||||
* Static utility method that performs the primary logic of this builder. It is visible to other
|
||||
* classes in this package so that other builders can use this functionality as a mix-in.
|
||||
* @param attributes the attributes map to be populated.
|
||||
* @param md the source of the RPC attribute values.
|
||||
*/
|
||||
static void populateMethodDescriptorAttributes(
|
||||
final Map<AttributeKey<?>, Object> attributes,
|
||||
final Descriptors.MethodDescriptor md
|
||||
) {
|
||||
final String packageAndService = getRpcPackageAndService(md.getService());
|
||||
final String method = getRpcName(md);
|
||||
attributes.put(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
|
||||
attributes.put(RPC_SERVICE, packageAndService);
|
||||
attributes.put(RPC_METHOD, method);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the combined {@code $package.$service} value from {@code sd}.
|
||||
*/
|
||||
public static String getRpcPackageAndService(final Descriptors.ServiceDescriptor sd) {
|
||||
// it happens that `getFullName` returns a string in the $package.$service format required by
|
||||
// the otel RPC specification. Use it for now; might have to parse the value in the future.
|
||||
return sd.getFullName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the {@code $method} value from {@code md}.
|
||||
*/
|
||||
public static String getRpcName(final Descriptors.MethodDescriptor md) {
|
||||
return md.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an RPC span name.
|
||||
*/
|
||||
public static String buildSpanName(final String packageAndService, final String method) {
|
||||
return packageAndService + "/" + method;
|
||||
}
|
||||
}
|
|
@ -20,10 +20,6 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_HOST_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_PORT_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
|
@ -40,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
|
@ -399,11 +396,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
|
||||
final Message param, Message returnType, final User ticket, final Address addr,
|
||||
final RpcCallback<Message> callback) {
|
||||
Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
|
||||
.setAttribute(RPC_SERVICE_KEY, md.getService().getName())
|
||||
.setAttribute(RPC_METHOD_KEY, md.getName())
|
||||
.setAttribute(REMOTE_HOST_KEY, addr.getHostName())
|
||||
.setAttribute(REMOTE_PORT_KEY, addr.getPort());
|
||||
Span span = new IpcClientSpanBuilder()
|
||||
.setMethodDescriptor(md)
|
||||
.setRemoteAddress(addr)
|
||||
.build();
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
|
||||
cs.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||
|
|
|
@ -24,6 +24,7 @@ import io.opentelemetry.api.trace.SpanKind;
|
|||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import java.time.Duration;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.FeatureMatcher;
|
||||
import org.hamcrest.Matcher;
|
||||
|
@ -46,6 +47,16 @@ public final class SpanDataMatchers {
|
|||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasDuration(Matcher<Duration> matcher) {
|
||||
return new FeatureMatcher<SpanData, Duration>(
|
||||
matcher, "SpanData having duration that ", "duration") {
|
||||
@Override
|
||||
protected Duration featureValueOf(SpanData item) {
|
||||
return Duration.ofNanos(item.getEndEpochNanos() - item.getStartEpochNanos());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasEnded() {
|
||||
return new TypeSafeMatcher<SpanData>() {
|
||||
@Override protected boolean matchesSafely(SpanData item) {
|
||||
|
@ -92,4 +103,17 @@ public final class SpanDataMatchers {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasTraceId(String traceId) {
|
||||
return hasTraceId(is(equalTo(traceId)));
|
||||
}
|
||||
|
||||
public static Matcher<SpanData> hasTraceId(Matcher<String> matcher) {
|
||||
return new FeatureMatcher<SpanData, String>(
|
||||
matcher, "SpanData with a traceId that ", "traceId") {
|
||||
@Override protected String featureValueOf(SpanData item) {
|
||||
return item.getTraceId();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,14 +44,13 @@ public final class HBaseSemanticAttributes {
|
|||
AttributeKey.stringArrayKey("db.hbase.container_operations");
|
||||
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
|
||||
AttributeKey.stringArrayKey("db.hbase.regions");
|
||||
public static final AttributeKey<String> RPC_SERVICE_KEY =
|
||||
AttributeKey.stringKey("db.hbase.rpc.service");
|
||||
public static final AttributeKey<String> RPC_METHOD_KEY =
|
||||
AttributeKey.stringKey("db.hbase.rpc.method");
|
||||
public static final AttributeKey<String> RPC_SYSTEM = SemanticAttributes.RPC_SYSTEM;
|
||||
public static final AttributeKey<String> RPC_SERVICE = SemanticAttributes.RPC_SERVICE;
|
||||
public static final AttributeKey<String> RPC_METHOD = SemanticAttributes.RPC_METHOD;
|
||||
public static final AttributeKey<String> SERVER_NAME_KEY =
|
||||
AttributeKey.stringKey("db.hbase.server.name");
|
||||
public static final AttributeKey<String> REMOTE_HOST_KEY = SemanticAttributes.NET_PEER_NAME;
|
||||
public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;
|
||||
public static final AttributeKey<String> NET_PEER_NAME = SemanticAttributes.NET_PEER_NAME;
|
||||
public static final AttributeKey<Long> NET_PEER_PORT = SemanticAttributes.NET_PEER_PORT;
|
||||
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
|
||||
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
|
||||
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
|
||||
|
@ -74,5 +73,13 @@ public final class HBaseSemanticAttributes {
|
|||
SCAN,
|
||||
}
|
||||
|
||||
/**
|
||||
* These are values used with {@link #RPC_SYSTEM}. Only a single value for now; more to come as
|
||||
* we add tracing over our gateway components.
|
||||
*/
|
||||
public enum RpcSystem {
|
||||
HBASE_RPC,
|
||||
}
|
||||
|
||||
private HBaseSemanticAttributes() { }
|
||||
}
|
||||
|
|
|
@ -255,6 +255,12 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-http</artifactId>
|
||||
|
|
|
@ -17,11 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
|
@ -32,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -90,14 +88,6 @@ public class CallRunner {
|
|||
this.rpcServer = null;
|
||||
}
|
||||
|
||||
private String getServiceName() {
|
||||
return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
|
||||
}
|
||||
|
||||
private String getMethodName() {
|
||||
return call.getMethod() != null ? call.getMethod().getName() : "";
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
if (call.disconnectSince() >= 0) {
|
||||
|
@ -122,12 +112,7 @@ public class CallRunner {
|
|||
String error = null;
|
||||
Pair<Message, CellScanner> resultPair = null;
|
||||
RpcServer.CurCall.set(call);
|
||||
String serviceName = getServiceName();
|
||||
String methodName = getMethodName();
|
||||
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
|
||||
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan()
|
||||
.setAttribute(RPC_SERVICE_KEY, serviceName)
|
||||
.setAttribute(RPC_METHOD_KEY, methodName);
|
||||
Span span = new IpcServerSpanBuilder(call).build();
|
||||
try (Scope traceScope = span.makeCurrent()) {
|
||||
if (!this.rpcServer.isStarted()) {
|
||||
InetSocketAddress address = rpcServer.getListenerAddress();
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.server.trace;
|
||||
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
|
||||
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.ServerCall;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
|
||||
import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
|
||||
/**
|
||||
* Construct {@link Span} instances originating from the server side of an IPC.
|
||||
*
|
||||
* @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class IpcServerSpanBuilder implements Supplier<Span> {
|
||||
|
||||
private final RpcCall rpcCall;
|
||||
private String name;
|
||||
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
|
||||
|
||||
public IpcServerSpanBuilder(final RpcCall rpcCall) {
|
||||
this.rpcCall = rpcCall;
|
||||
final String packageAndService = Optional.ofNullable(rpcCall.getService())
|
||||
.map(BlockingService::getDescriptorForType)
|
||||
.map(IpcClientSpanBuilder::getRpcPackageAndService)
|
||||
.orElse("");
|
||||
final String method = Optional.ofNullable(rpcCall.getMethod())
|
||||
.map(IpcClientSpanBuilder::getRpcName)
|
||||
.orElse("");
|
||||
setName(IpcClientSpanBuilder.buildSpanName(packageAndService, method));
|
||||
addAttribute(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
|
||||
addAttribute(RPC_SERVICE, packageAndService);
|
||||
addAttribute(RPC_METHOD, method);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Span get() {
|
||||
return build();
|
||||
}
|
||||
|
||||
public IpcServerSpanBuilder setName(final String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
public <T> IpcServerSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
|
||||
attributes.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Span build() {
|
||||
final SpanBuilder builder = TraceUtil.getGlobalTracer()
|
||||
.spanBuilder(name)
|
||||
.setSpanKind(SpanKind.SERVER);
|
||||
attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
|
||||
return builder.setParent(Context.current().with(((ServerCall<?>) rpcCall).getSpan()))
|
||||
.startSpan();
|
||||
}
|
||||
}
|
|
@ -17,11 +17,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
|
||||
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
|
||||
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -33,16 +43,16 @@ import static org.mockito.ArgumentMatchers.any;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.StatusCode;
|
||||
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
|
@ -50,22 +60,21 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MatcherPredicate;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
|
||||
|
@ -442,74 +451,124 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertSameTraceId() {
|
||||
String traceId = traceRule.getSpans().get(0).getTraceId();
|
||||
for (SpanData data : traceRule.getSpans()) {
|
||||
// assert we are the same trace
|
||||
assertEquals(traceId, data.getTraceId());
|
||||
}
|
||||
private SpanData waitSpan(Matcher<SpanData> matcher) {
|
||||
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
|
||||
() -> traceRule.getSpans(), hasItem(matcher)));
|
||||
return traceRule.getSpans()
|
||||
.stream()
|
||||
.filter(matcher::matches)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
}
|
||||
|
||||
private SpanData waitSpan(String name) {
|
||||
Waiter.waitFor(CONF, 1000,
|
||||
() -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
|
||||
return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
|
||||
private static String buildIpcSpanName(final String packageAndService, final String methodName) {
|
||||
return packageAndService + "/" + methodName;
|
||||
}
|
||||
|
||||
private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
|
||||
SpanKind kind) {
|
||||
assertEquals(SERVICE.getDescriptorForType().getName(),
|
||||
data.getAttributes().get(HBaseSemanticAttributes.RPC_SERVICE_KEY));
|
||||
assertEquals(methodName, data.getAttributes().get(HBaseSemanticAttributes.RPC_METHOD_KEY));
|
||||
if (addr != null) {
|
||||
assertEquals(
|
||||
addr.getHostName(),
|
||||
data.getAttributes().get(HBaseSemanticAttributes.REMOTE_HOST_KEY));
|
||||
assertEquals(
|
||||
addr.getPort(),
|
||||
data.getAttributes().get(HBaseSemanticAttributes.REMOTE_PORT_KEY).intValue());
|
||||
}
|
||||
assertEquals(kind, data.getKind());
|
||||
private static Matcher<SpanData> buildIpcClientSpanMatcher(
|
||||
final String packageAndService,
|
||||
final String methodName
|
||||
) {
|
||||
return allOf(
|
||||
hasName(buildIpcSpanName(packageAndService, methodName)),
|
||||
hasKind(SpanKind.CLIENT)
|
||||
);
|
||||
}
|
||||
|
||||
private static Matcher<SpanData> buildIpcServerSpanMatcher(
|
||||
final String packageAndService,
|
||||
final String methodName
|
||||
) {
|
||||
return allOf(
|
||||
hasName(buildIpcSpanName(packageAndService, methodName)),
|
||||
hasKind(SpanKind.SERVER)
|
||||
);
|
||||
}
|
||||
|
||||
private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
|
||||
final String packageAndService,
|
||||
final String methodName,
|
||||
final InetSocketAddress isa
|
||||
) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("rpc.system", "HBASE_RPC"),
|
||||
containsEntry("rpc.service", packageAndService),
|
||||
containsEntry("rpc.method", methodName),
|
||||
containsEntry("net.peer.name", isa.getHostName()),
|
||||
containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
|
||||
}
|
||||
|
||||
private static Matcher<SpanData> buildIpcServerSpanAttributesMatcher(
|
||||
final String packageAndService,
|
||||
final String methodName
|
||||
) {
|
||||
return hasAttributes(allOf(
|
||||
containsEntry("rpc.system", "HBASE_RPC"),
|
||||
containsEntry("rpc.service", packageAndService),
|
||||
containsEntry("rpc.method", methodName)));
|
||||
}
|
||||
|
||||
private void assertRemoteSpan() {
|
||||
SpanData data = waitSpan("RpcServer.process");
|
||||
SpanData data = waitSpan(hasName("RpcServer.process"));
|
||||
assertTrue(data.getParentSpanContext().isRemote());
|
||||
assertEquals(SpanKind.SERVER, data.getKind());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTracing() throws IOException, ServiceException {
|
||||
public void testTracingSuccessIpc() throws IOException, ServiceException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
|
||||
new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
|
||||
SpanKind.CLIENT);
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, SpanKind.INTERNAL);
|
||||
// use the ISA from the running server so that we can get the port selected.
|
||||
final InetSocketAddress isa = rpcServer.getListenerAddress();
|
||||
final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "pause"));
|
||||
assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "pause", isa));
|
||||
final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "pause"));
|
||||
assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "pause"));
|
||||
assertRemoteSpan();
|
||||
assertSameTraceId();
|
||||
for (SpanData data : traceRule.getSpans()) {
|
||||
assertThat(
|
||||
TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()),
|
||||
greaterThanOrEqualTo(100L));
|
||||
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
|
||||
}
|
||||
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
|
||||
assertThat(traceRule.getSpans(), everyItem(allOf(
|
||||
hasStatusWithCode(StatusCode.OK),
|
||||
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
|
||||
hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
|
||||
}
|
||||
}
|
||||
|
||||
traceRule.clearSpans();
|
||||
@Test
|
||||
public void testTracingErrorIpc() throws IOException {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
|
||||
new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
|
||||
rpcServer.start();
|
||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
||||
// use the ISA from the running server so that we can get the port selected.
|
||||
assertThrows(ServiceException.class,
|
||||
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
|
||||
assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
|
||||
SpanKind.CLIENT);
|
||||
assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, SpanKind.INTERNAL);
|
||||
final InetSocketAddress isa = rpcServer.getListenerAddress();
|
||||
final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "error"));
|
||||
assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "error", isa));
|
||||
final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "error"));
|
||||
assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher(
|
||||
"hbase.test.pb.TestProtobufRpcProto", "error"));
|
||||
assertRemoteSpan();
|
||||
assertSameTraceId();
|
||||
for (SpanData data : traceRule.getSpans()) {
|
||||
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
|
||||
}
|
||||
assertFalse("no spans provided", traceRule.getSpans().isEmpty());
|
||||
assertThat(traceRule.getSpans(), everyItem(allOf(
|
||||
hasStatusWithCode(StatusCode.ERROR),
|
||||
hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
7
pom.xml
7
pom.xml
|
@ -1846,6 +1846,13 @@
|
|||
<groupId>org.apache.hbase</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<artifactId>hbase-metrics-api</artifactId>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
|
|
Loading…
Reference in New Issue