From fffdcba5bbb743b91cae274addba25a6433e3732 Mon Sep 17 00:00:00 2001 From: "Tak Lon (Stephen) Wu" Date: Wed, 4 Aug 2021 15:36:27 -0700 Subject: [PATCH] Revert "HBASE-26125 Backport HBASE-25401 "Add trace support for async call in rpc client" to branch-2 (#3543)" This reverts commit ca096437d7e096b514ddda53ec2f97b85d90752d. --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 73 +++--- .../hbase/ipc/BlockingRpcConnection.java | 21 +- .../org/apache/hadoop/hbase/ipc/Call.java | 2 +- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 12 +- .../hbase/ipc/NettyRpcDuplexHandler.java | 8 +- .../apache/hadoop/hbase/trace/TraceUtil.java | 4 +- .../src/main/protobuf/Tracing.proto | 14 +- hbase-server/pom.xml | 10 - .../apache/hadoop/hbase/ipc/CallRunner.java | 19 +- .../apache/hadoop/hbase/ipc/ServerCall.java | 14 -- .../hadoop/hbase/ipc/ServerRpcConnection.java | 228 ++++++++---------- .../hadoop/hbase/ipc/AbstractTestIPC.java | 59 +---- .../apache/hadoop/hbase/ipc/TestNettyIPC.java | 1 - pom.xml | 12 +- 14 files changed, 176 insertions(+), 301 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 9117fefa4ba..4bbb729f1f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -21,9 +21,6 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.net.SocketAddress; import java.util.Collection; @@ -41,7 +38,6 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.Threads; @@ -369,7 +365,7 @@ public abstract class AbstractRpcClient implements RpcC protected abstract T createConnection(ConnectionId remoteId) throws IOException; private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, - RpcCallback callback) { + RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); if (metrics != null) { metrics.updateRpc(call.md, call.param, call.callStats); @@ -392,59 +388,44 @@ public abstract class AbstractRpcClient implements RpcC } } - private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, + Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, final Address addr, final RpcCallback callback) { - Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName()) - .startSpan(); - try (Scope scope = span.makeCurrent()) { - final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); - cs.setStartTime(EnvironmentEdgeManager.currentTime()); + final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); + cs.setStartTime(EnvironmentEdgeManager.currentTime()); - if (param instanceof ClientProtos.MultiRequest) { - ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; - int numActions = 0; - for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { - numActions += regionAction.getActionCount(); - } - - cs.setNumActionsPerServer(numActions); + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param; + int numActions = 0; + for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { + numActions += regionAction.getActionCount(); } - final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); - Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, + cs.setNumActionsPerServer(numActions); + } + + final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); + Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @Override public void run(Call call) { - try (Scope scope = call.span.makeCurrent()) { - counter.decrementAndGet(); - onCallFinished(call, hrc, addr, callback); - } finally { - if (hrc.failed()) { - span.setStatus(StatusCode.ERROR); - span.recordException(hrc.getFailed()); - } else { - span.setStatus(StatusCode.OK); - } - span.end(); - } + counter.decrementAndGet(); + onCallFinished(call, hrc, addr, callback); } }, cs); - ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); - int count = counter.incrementAndGet(); - try { - if (count > maxConcurrentCallsPerServer) { - throw new ServerTooBusyException(addr, count); - } - cs.setConcurrentCallsPerServer(count); - T connection = getConnection(remoteId); - connection.sendRequest(call, hrc); - } catch (Exception e) { - call.setException(toIOE(e)); - span.end(); + ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); + int count = counter.incrementAndGet(); + try { + if (count > maxConcurrentCallsPerServer) { + throw new ServerTooBusyException(addr, count); } - return call; + cs.setConcurrentCallsPerServer(count); + T connection = getConnection(remoteId); + connection.sendRequest(call, hrc); + } catch (Exception e) { + call.setException(toIOE(e)); } + return call; } private static Address createAddr(ServerName sn) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index eb8e1d92b21..1a5cb73bccf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.write; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -55,6 +57,7 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.io.IOUtils; @@ -189,8 +192,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (call.isDone()) { continue; } - try (Scope scope = call.span.makeCurrent()) { - writeRequest(call); + try { + tracedWriteRequest(call); } catch (IOException e) { // exception here means the call has not been added to the pendingCalls yet, so we need // to fail it by our own. @@ -591,6 +594,16 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream())); } + private void tracedWriteRequest(Call call) throws IOException { + Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest") + .setParent(Context.current().with(call.span)).startSpan(); + try (Scope scope = span.makeCurrent()) { + writeRequest(call); + } finally { + span.end(); + } + } + /** * Initiates a call by sending the parameter to the remote server. Note: this is not called from * the Connection thread, but by other threads. @@ -798,9 +811,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { if (callSender != null) { callSender.sendCall(call); } else { - // this is in the same thread with the caller so do not need to attach the trace context - // again. - writeRequest(call); + tracedWriteRequest(call); } } }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 8d23d924339..113f731aaa2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -61,7 +61,7 @@ class Call { final Span span; Timeout timeoutTask; - Call(int id, final Descriptors.MethodDescriptor md, Message param, + protected Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index fd42214d1d3..42ad33a1e61 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.context.Context; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; @@ -51,7 +49,6 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; /** * Utility to help ipc'ing. @@ -115,10 +112,11 @@ class IPCUtil { static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setCallId(call.id); - RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder(); - GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), - traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value)); - builder.setTraceInfo(traceBuilder.build()); + //TODO handle htrace API change, see HBASE-18895 + /*if (call.span != null) { + builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId()) + .setTraceId(call.span.getTracerId())); + }*/ builder.setMethodName(call.md.getName()); builder.setRequestParam(call.param != null); if (cellBlockMeta != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index c67d96f0a75..f31e3d21aab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -115,12 +114,9 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { + throws Exception { if (msg instanceof Call) { - Call call = (Call) msg; - try (Scope scope = call.span.makeCurrent()) { - writeRequest(ctx, call, promise); - } + writeRequest(ctx, (Call) msg, promise); } else { ctx.write(msg, promise); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 768de9c3b9c..f7a111f5901 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.trace; -import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; import org.apache.yetus.audience.InterfaceAudience; @@ -30,6 +30,6 @@ public final class TraceUtil { } public static Tracer getGlobalTracer() { - return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME); + return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Tracing.proto b/hbase-protocol-shaded/src/main/protobuf/Tracing.proto index 276a0a77b95..64ead844d95 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Tracing.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Tracing.proto @@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; -// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which -// is a text-based approach that passes properties with http headers. Here we will also use this -// approach so we just need a map to store the key value pair. - +//Used to pass through the information necessary to continue +//a trace after an RPC is made. All we need is the traceid +//(so we know the overarching trace this message is a part of), and +//the id of the current span when this message was sent, so we know +//what span caused the new span we will create when this message is received. message RPCTInfo { - optional int64 trace_id = 1 [deprecated = true]; - optional int64 parent_id = 2 [deprecated = true]; - map headers = 3; + optional int64 trace_id = 1; + optional int64 parent_id = 2; } diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index ef2fad38174..a9a7b72efa8 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -440,16 +440,6 @@ hamcrest-core test - - io.opentelemetry - opentelemetry-sdk - test - - - io.opentelemetry - opentelemetry-sdk-testing - test - org.hamcrest hamcrest-library diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 203f079b4bd..3ae089e7845 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.ipc; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; @@ -75,6 +73,15 @@ public class CallRunner { return call; } + /** + * Keep for backward compatibility. + * @deprecated As of release 2.0, this will be removed in HBase 3.0 + */ + @Deprecated + public ServerCall getCall() { + return (ServerCall) call; + } + public void setStatus(MonitoredRPCHandler status) { this.status = status; } @@ -123,8 +130,7 @@ public class CallRunner { String serviceName = getServiceName(); String methodName = getMethodName(); String traceString = serviceName + "." + methodName; - Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString) - .setParent(Context.current().with(((ServerCall) call).getSpan())).startSpan(); + Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan(); try (Scope traceScope = span.makeCurrent()) { if (!this.rpcServer.isStarted()) { InetSocketAddress address = rpcServer.getListenerAddress(); @@ -135,12 +141,8 @@ public class CallRunner { resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); - span.recordException(e); - span.setStatus(StatusCode.ERROR); return; } catch (Throwable e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR); if (e instanceof ServerNotRunningYetException) { // If ServerNotRunningYetException, don't spew stack trace. if (RpcServer.LOG.isTraceEnabled()) { @@ -159,7 +161,6 @@ public class CallRunner { RpcServer.CurCall.set(null); if (resultPair != null) { this.rpcServer.addCallSize(call.getSize() * -1); - span.setStatus(StatusCode.OK); sucessful = true; } span.end(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 4a021ce44a1..ff4a521d3b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -104,8 +102,6 @@ public abstract class ServerCall implements RpcCa // from WAL side on release private final AtomicInteger reference = new AtomicInteger(0x80000000); - private final Span span; - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", justification = "Can't figure why this complaint is happening... see below") ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, @@ -136,7 +132,6 @@ public abstract class ServerCall implements RpcCa this.bbAllocator = byteBuffAllocator; this.cellBlockBuilder = cellBlockBuilder; this.reqCleanup = reqCleanup; - this.span = Span.current(); } /** @@ -155,7 +150,6 @@ public abstract class ServerCall implements RpcCa // If the call was run successfuly, we might have already returned the BB // back to pool. No worries..Then inputCellBlock will be null cleanup(); - span.end(); } @Override @@ -232,10 +226,6 @@ public abstract class ServerCall implements RpcCa } if (t != null) { this.isError = true; - span.recordException(t); - span.setStatus(StatusCode.ERROR); - } else { - span.setStatus(StatusCode.OK); } BufferChain bc = null; try { @@ -570,8 +560,4 @@ public abstract class ServerCall implements RpcCa return response; } } - - public Span getSpan() { - return span; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index db7f052ca48..29ce30b8652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -19,11 +19,6 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.HConstants.RPC_HEADER; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapPropagator; import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.DataOutputStream; @@ -36,11 +31,13 @@ import java.nio.channels.ReadableByteChannel; import java.security.GeneralSecurityException; import java.util.Objects; import java.util.Properties; + import org.apache.commons.crypto.cipher.CryptoCipherFactory; import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; @@ -56,7 +53,21 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider; import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; -import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; @@ -68,25 +79,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; -import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; -import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; -import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings( @@ -615,115 +607,99 @@ abstract class ServerRpcConnection implements Closeable { ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; - TextMapPropagator.Getter getter = new TextMapPropagator.Getter() { - - @Override - public Iterable keys(RPCTInfo carrier) { - return carrier.getHeadersMap().keySet(); - } - - @Override - public String get(RPCTInfo carrier, String key) { - return carrier.getHeadersMap().get(key); - } - }; - Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator() - .extract(Context.current(), header.getTraceInfo(), getter); - Span span = - TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan(); - try (Scope scope = span.makeCurrent()) { - int id = header.getCallId(); - if (RpcServer.LOG.isTraceEnabled()) { - RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + - " totalRequestSize: " + totalRequestSize + " bytes"); - } - // Enforcing the call queue size, this triggers a retry in the client - // This is a bit late to be doing this check - we have already read in the - // total request. - if ((totalRequestSize + + int id = header.getCallId(); + if (RpcServer.LOG.isTraceEnabled()) { + RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + + " totalRequestSize: " + totalRequestSize + " bytes"); + } + // Enforcing the call queue size, this triggers a retry in the client + // This is a bit late to be doing this check - we have already read in the + // total request. + if ((totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { - final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + - ", is hbase.ipc.server.max.callqueue.size too small?"); - callTooBig.sendResponseIfReady(); - return; + final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, 0, this.callCleanup); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?"); + callTooBig.sendResponseIfReady(); + return; + } + MethodDescriptor md = null; + Message param = null; + CellScanner cellScanner = null; + try { + if (header.hasRequestParam() && header.getRequestParam()) { + md = this.service.getDescriptorForType().findMethodByName( + header.getMethodName()); + if (md == null) + throw new UnsupportedOperationException(header.getMethodName()); + builder = this.service.getRequestPrototype(md).newBuilderForType(); + cis.resetSizeCounter(); + int paramSize = cis.readRawVarint32(); + offset += cis.getTotalBytesRead(); + if (builder != null) { + ProtobufUtil.mergeFrom(builder, cis, paramSize); + param = builder.build(); + } + offset += paramSize; + } else { + // currently header must have request param, so we directly throw + // exception here + String msg = "Invalid request header: " + + TextFormat.shortDebugString(header) + + ", should have param set in it"; + RpcServer.LOG.warn(msg); + throw new DoNotRetryIOException(msg); } - MethodDescriptor md = null; - Message param = null; - CellScanner cellScanner = null; - try { - if (header.hasRequestParam() && header.getRequestParam()) { - md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); - if (md == null) { - throw new UnsupportedOperationException(header.getMethodName()); - } - builder = this.service.getRequestPrototype(md).newBuilderForType(); - cis.resetSizeCounter(); - int paramSize = cis.readRawVarint32(); - offset += cis.getTotalBytesRead(); - if (builder != null) { - ProtobufUtil.mergeFrom(builder, cis, paramSize); - param = builder.build(); - } - offset += paramSize; - } else { - // currently header must have request param, so we directly throw - // exception here - String msg = "Invalid request header: " + TextFormat.shortDebugString(header) + - ", should have param set in it"; - RpcServer.LOG.warn(msg); - throw new DoNotRetryIOException(msg); - } - if (header.hasCellBlockMeta()) { - buf.position(offset); - ByteBuff dup = buf.duplicate(); - dup.limit(offset + header.getCellBlockMeta().getLength()); - cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec, - this.compressionCodec, dup); - } - } catch (Throwable t) { - InetSocketAddress address = this.rpcServer.getListenerAddress(); - String msg = (address != null ? address : "(channel closed)") + - " is unable to read call parameter from client " + getHostAddress(); - RpcServer.LOG.warn(msg, t); + if (header.hasCellBlockMeta()) { + buf.position(offset); + ByteBuff dup = buf.duplicate(); + dup.limit(offset + header.getCellBlockMeta().getLength()); + cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers( + this.codec, this.compressionCodec, dup); + } + } catch (Throwable t) { + InetSocketAddress address = this.rpcServer.getListenerAddress(); + String msg = (address != null ? address : "(channel closed)") + + " is unable to read call parameter from client " + + getHostAddress(); + RpcServer.LOG.warn(msg, t); - this.rpcServer.metrics.exception(t); + this.rpcServer.metrics.exception(t); - // probably the hbase hadoop version does not match the running hadoop - // version - if (t instanceof LinkageError) { - t = new DoNotRetryIOException(t); - } - // If the method is not present on the server, do not retry. - if (t instanceof UnsupportedOperationException) { - t = new DoNotRetryIOException(t); - } - - ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); - readParamsFailedCall.sendResponseIfReady(); - return; + // probably the hbase hadoop version does not match the running hadoop + // version + if (t instanceof LinkageError) { + t = new DoNotRetryIOException(t); + } + // If the method is not present on the server, do not retry. + if (t instanceof UnsupportedOperationException) { + t = new DoNotRetryIOException(t); } - int timeout = 0; - if (header.hasTimeout() && header.getTimeout() > 0) { - timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); - } - ServerCall call = createCall(id, this.service, md, header, param, cellScanner, - totalRequestSize, this.addr, timeout, this.callCleanup); + ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, 0, this.callCleanup); + readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage()); + readParamsFailedCall.sendResponseIfReady(); + return; + } - if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { - this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + + int timeout = 0; + if (header.hasTimeout() && header.getTimeout() > 0) { + timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout()); + } + ServerCall call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize, + this.addr, timeout, this.callCleanup); + + if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) { + this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.rpcServer.server.getServerName() + ", too many items queued ?"); - call.sendResponseIfReady(); - } + call.sendResponseIfReady(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 11978ca6c8e..87561bac745 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -20,28 +20,21 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; -import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -50,12 +43,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,10 +87,6 @@ public abstract class AbstractTestIPC { protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); - - @Rule - public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); - /** * Ensure we do not HAVE TO HAVE a codec. */ @@ -196,7 +183,7 @@ public abstract class AbstractTestIPC { RpcServer rpcServer = createRpcServer(null, "testRpcServer", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler); - verify(scheduler).init(any(RpcScheduler.Context.class)); + verify(scheduler).init((RpcScheduler.Context) anyObject()); try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); verify(scheduler).start(); @@ -205,7 +192,7 @@ public abstract class AbstractTestIPC { for (int i = 0; i < 10; i++) { stub.echo(null, param); } - verify(scheduler, times(10)).dispatch(any(CallRunner.class)); + verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { rpcServer.stop(); verify(scheduler).stop(); @@ -440,44 +427,4 @@ public abstract class AbstractTestIPC { } } - private void assertSameTraceId() { - String traceId = traceRule.getSpans().get(0).getTraceId(); - for (SpanData data : traceRule.getSpans()) { - // assert we are the same trace - assertEquals(traceId, data.getTraceId()); - } - } - - @Test - public void testTracing() throws IOException, ServiceException { - RpcServer rpcServer = createRpcServer(null, "testRpcServer", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); - try (AbstractRpcClient client = createRpcClient(CONF)) { - rpcServer.start(); - BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause"))); - - assertSameTraceId(); - for (SpanData data : traceRule.getSpans()) { - assertThat( - TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()), - greaterThanOrEqualTo(100L)); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - } - - traceRule.clearSpans(); - assertThrows(ServiceException.class, - () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); - Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName) - .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error"))); - - assertSameTraceId(); - for (SpanData data : traceRule.getSpans()) { - assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode()); - } - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index c3b52a97cc1..2601fba0588 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -39,7 +39,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; - import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel; diff --git a/pom.xml b/pom.xml index 77d12ac4b89..7803e88cb7f 100755 --- a/pom.xml +++ b/pom.xml @@ -1482,7 +1482,7 @@ 9.2.13.0 4.13 1.3 - 0.13.1 + 0.12.0 1.2.17 2.28.2 @@ -2178,16 +2178,6 @@ opentelemetry-api ${opentelemetry.version} - - io.opentelemetry - opentelemetry-sdk - ${opentelemetry.version} - - - io.opentelemetry - opentelemetry-sdk-testing - ${opentelemetry.version} - com.lmax disruptor