HBASE-25401 Add trace support for async call in rpc client (#2790)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Duo Zhang 2020-12-23 11:51:53 +08:00 committed by Duo Zhang
parent 302d9ea8b8
commit 2420286715
14 changed files with 301 additions and 176 deletions

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collection;
@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
@ -365,7 +369,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
metrics.updateRpc(call.md, call.param, call.callStats);
@ -388,44 +392,59 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
}
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
final Address addr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
.startSpan();
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
}
cs.setNumActionsPerServer(numActions);
}
cs.setNumActionsPerServer(numActions);
}
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
try (Scope scope = call.span.makeCurrent()) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
} finally {
if (hrc.failed()) {
span.setStatus(StatusCode.ERROR);
span.recordException(hrc.getFailed());
} else {
span.setStatus(StatusCode.OK);
}
span.end();
}
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
span.end();
}
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
return call;
}
return call;
}
private static Address createAddr(ServerName sn) {

View File

@ -24,8 +24,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -57,7 +55,6 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.io.IOUtils;
@ -192,8 +189,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (call.isDone()) {
continue;
}
try {
tracedWriteRequest(call);
try (Scope scope = call.span.makeCurrent()) {
writeRequest(call);
} catch (IOException e) {
// exception here means the call has not been added to the pendingCalls yet, so we need
// to fail it by our own.
@ -594,16 +591,6 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}
private void tracedWriteRequest(Call call) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
.setParent(Context.current().with(call.span)).startSpan();
try (Scope scope = span.makeCurrent()) {
writeRequest(call);
} finally {
span.end();
}
}
/**
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
* the Connection thread, but by other threads.
@ -811,7 +798,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (callSender != null) {
callSender.sendCall(call);
} else {
tracedWriteRequest(call);
// this is in the same thread with the caller so do not need to attach the trace context
// again.
writeRequest(call);
}
}
});

View File

@ -61,7 +61,7 @@ class Call {
final Span span;
Timeout timeoutTask;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
@ -49,6 +51,7 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
/**
* Utility to help ipc'ing.
@ -112,11 +115,10 @@ class IPCUtil {
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
//TODO handle htrace API change, see HBASE-18895
/*if (call.span != null) {
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
.setTraceId(call.span.getTracerId()));
}*/
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
builder.setTraceInfo(traceBuilder.build());
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
if (cellBlockMeta != null) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -114,9 +115,12 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
throws Exception {
if (msg instanceof Call) {
writeRequest(ctx, (Call) msg, promise);
Call call = (Call) msg;
try (Scope scope = call.span.makeCurrent()) {
writeRequest(ctx, call, promise);
}
} else {
ctx.write(msg, promise);
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import org.apache.yetus.audience.InterfaceAudience;
@ -30,6 +30,6 @@ public final class TraceUtil {
}
public static Tracer getGlobalTracer() {
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
}
}

View File

@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
//Used to pass through the information necessary to continue
//a trace after an RPC is made. All we need is the traceid
//(so we know the overarching trace this message is a part of), and
//the id of the current span when this message was sent, so we know
//what span caused the new span we will create when this message is received.
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
// is a text-based approach that passes properties with http headers. Here we will also use this
// approach so we just need a map to store the key value pair.
message RPCTInfo {
optional int64 trace_id = 1;
optional int64 parent_id = 2;
optional int64 trace_id = 1 [deprecated = true];
optional int64 parent_id = 2 [deprecated = true];
map<string, string> headers = 3;
}

View File

@ -422,6 +422,16 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
@ -72,15 +74,6 @@ public class CallRunner {
return call;
}
/**
* Keep for backward compatibility.
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
public ServerCall<?> getCall() {
return (ServerCall<?>) call;
}
public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
@ -129,7 +122,8 @@ public class CallRunner {
String serviceName = getServiceName();
String methodName = getMethodName();
String traceString = serviceName + "." + methodName;
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
@ -140,8 +134,12 @@ public class CallRunner {
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
span.recordException(e);
span.setStatus(StatusCode.ERROR);
return;
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
if (e instanceof ServerNotRunningYetException) {
// If ServerNotRunningYetException, don't spew stack trace.
if (RpcServer.LOG.isTraceEnabled()) {
@ -160,6 +158,7 @@ public class CallRunner {
RpcServer.CurCall.set(null);
if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1);
span.setStatus(StatusCode.OK);
sucessful = true;
}
span.end();

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@ -99,6 +101,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// the current implementation. We should fix this in the future.
private final AtomicInteger reference = new AtomicInteger(0b01);
private final Span span;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@ -129,6 +133,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
this.span = Span.current();
}
/**
@ -147,6 +152,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
span.end();
}
private void release(int mask) {
@ -226,6 +232,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
}
if (t != null) {
this.isError = true;
span.recordException(t);
span.setStatus(StatusCode.ERROR);
} else {
span.setStatus(StatusCode.OK);
}
BufferChain bc = null;
try {
@ -560,4 +570,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return response;
}
}
public Span getSpan() {
return span;
}
}

View File

@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
@ -31,13 +36,11 @@ import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
@ -53,21 +56,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
@ -79,6 +68,25 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
@ -607,99 +615,115 @@ abstract class ServerRpcConnection implements Closeable {
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
offset += headerSize;
int id = header.getCallId();
if (RpcServer.LOG.isTraceEnabled()) {
RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
+ " totalRequestSize: " + totalRequestSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize +
TextMapPropagator.Getter<RPCTInfo> getter = new TextMapPropagator.Getter<RPCTInfo>() {
@Override
public Iterable<String> keys(RPCTInfo carrier) {
return carrier.getHeadersMap().keySet();
}
@Override
public String get(RPCTInfo carrier, String key) {
return carrier.getHeadersMap().get(key);
}
};
Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), header.getTraceInfo(), getter);
Span span =
TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan();
try (Scope scope = span.makeCurrent()) {
int id = header.getCallId();
if (RpcServer.LOG.isTraceEnabled()) {
RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
" totalRequestSize: " + totalRequestSize + " bytes");
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize +
this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, 0, this.callCleanup);
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
MethodDescriptor md = null;
Message param = null;
CellScanner cellScanner = null;
try {
if (header.hasRequestParam() && header.getRequestParam()) {
md = this.service.getDescriptorForType().findMethodByName(
header.getMethodName());
if (md == null)
throw new UnsupportedOperationException(header.getMethodName());
builder = this.service.getRequestPrototype(md).newBuilderForType();
cis.resetSizeCounter();
int paramSize = cis.readRawVarint32();
offset += cis.getTotalBytesRead();
if (builder != null) {
ProtobufUtil.mergeFrom(builder, cis, paramSize);
param = builder.build();
final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, 0, this.callCleanup);
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", is hbase.ipc.server.max.callqueue.size too small?");
callTooBig.sendResponseIfReady();
return;
}
MethodDescriptor md = null;
Message param = null;
CellScanner cellScanner = null;
try {
if (header.hasRequestParam() && header.getRequestParam()) {
md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
if (md == null) {
throw new UnsupportedOperationException(header.getMethodName());
}
builder = this.service.getRequestPrototype(md).newBuilderForType();
cis.resetSizeCounter();
int paramSize = cis.readRawVarint32();
offset += cis.getTotalBytesRead();
if (builder != null) {
ProtobufUtil.mergeFrom(builder, cis, paramSize);
param = builder.build();
}
offset += paramSize;
} else {
// currently header must have request param, so we directly throw
// exception here
String msg = "Invalid request header: " + TextFormat.shortDebugString(header) +
", should have param set in it";
RpcServer.LOG.warn(msg);
throw new DoNotRetryIOException(msg);
}
offset += paramSize;
} else {
// currently header must have request param, so we directly throw
// exception here
String msg = "Invalid request header: "
+ TextFormat.shortDebugString(header)
+ ", should have param set in it";
RpcServer.LOG.warn(msg);
throw new DoNotRetryIOException(msg);
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
ByteBuff dup = buf.duplicate();
dup.limit(offset + header.getCellBlockMeta().getLength());
cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
this.codec, this.compressionCodec, dup);
}
} catch (Throwable t) {
InetSocketAddress address = this.rpcServer.getListenerAddress();
String msg = (address != null ? address : "(channel closed)")
+ " is unable to read call parameter from client "
+ getHostAddress();
RpcServer.LOG.warn(msg, t);
if (header.hasCellBlockMeta()) {
buf.position(offset);
ByteBuff dup = buf.duplicate();
dup.limit(offset + header.getCellBlockMeta().getLength());
cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
this.compressionCodec, dup);
}
} catch (Throwable t) {
InetSocketAddress address = this.rpcServer.getListenerAddress();
String msg = (address != null ? address : "(channel closed)") +
" is unable to read call parameter from client " + getHostAddress();
RpcServer.LOG.warn(msg, t);
this.rpcServer.metrics.exception(t);
this.rpcServer.metrics.exception(t);
// probably the hbase hadoop version does not match the running hadoop
// version
if (t instanceof LinkageError) {
t = new DoNotRetryIOException(t);
}
// If the method is not present on the server, do not retry.
if (t instanceof UnsupportedOperationException) {
t = new DoNotRetryIOException(t);
// probably the hbase hadoop version does not match the running hadoop
// version
if (t instanceof LinkageError) {
t = new DoNotRetryIOException(t);
}
// If the method is not present on the server, do not retry.
if (t instanceof UnsupportedOperationException) {
t = new DoNotRetryIOException(t);
}
ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, 0, this.callCleanup);
readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
readParamsFailedCall.sendResponseIfReady();
return;
}
ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
totalRequestSize, null, 0, this.callCleanup);
readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
readParamsFailedCall.sendResponseIfReady();
return;
}
int timeout = 0;
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
}
ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
totalRequestSize, this.addr, timeout, this.callCleanup);
int timeout = 0;
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
}
ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
this.addr, timeout, this.callCleanup);
if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
"Call queue is full on " + this.rpcServer.server.getServerName() +
", too many items queued ?");
call.sendResponseIfReady();
call.sendResponseIfReady();
}
}
}

View File

@ -20,21 +20,28 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -43,10 +50,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,6 +96,10 @@ public abstract class AbstractTestIPC {
protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
/**
* Ensure we do not HAVE TO HAVE a codec.
*/
@ -183,7 +196,7 @@ public abstract class AbstractTestIPC {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject());
verify(scheduler).init(any(RpcScheduler.Context.class));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
verify(scheduler).start();
@ -192,7 +205,7 @@ public abstract class AbstractTestIPC {
for (int i = 0; i < 10; i++) {
stub.echo(null, param);
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
verify(scheduler, times(10)).dispatch(any(CallRunner.class));
} finally {
rpcServer.stop();
verify(scheduler).stop();
@ -427,4 +440,44 @@ public abstract class AbstractTestIPC {
}
}
private void assertSameTraceId() {
String traceId = traceRule.getSpans().get(0).getTraceId();
for (SpanData data : traceRule.getSpans()) {
// assert we are the same trace
assertEquals(traceId, data.getTraceId());
}
}
@Test
public void testTracing() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
.anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause")));
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertThat(
TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()),
greaterThanOrEqualTo(100L));
assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
}
traceRule.clearSpans();
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
.anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error")));
assertSameTraceId();
for (SpanData data : traceRule.getSpans()) {
assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
}
}
}
}

View File

@ -39,6 +39,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;

12
pom.xml
View File

@ -1717,7 +1717,7 @@
<jruby.version>9.2.13.0</jruby.version>
<junit.version>4.13</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<opentelemetry.version>0.12.0</opentelemetry.version>
<opentelemetry.version>0.13.1</opentelemetry.version>
<log4j2.version>2.14.1</log4j2.version>
<mockito-core.version>2.28.2</mockito-core.version>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
@ -2397,6 +2397,16 @@
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>