From 0c2146587884d676573ff0fc54629febd2d5906b Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 15 Mar 2022 12:11:10 -0400 Subject: [PATCH] Revert "Upstream Callers (#15)" See HBasePlanning/issues/806 for details This reverts commit 8cc6bfb5b22b2b16f2618f02b3aeb15d98d050f0. --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 15 +--- .../org/apache/hadoop/hbase/ipc/Call.java | 7 +- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 5 -- .../hadoop/hbase/ipc/UpstreamCaller.java | 18 ---- .../src/main/protobuf/RPC.proto | 3 - hbase-protocol/src/main/protobuf/RPC.proto | 3 - .../apache/hadoop/hbase/ipc/CallRunner.java | 4 +- .../hadoop/hbase/ipc/RpcCallContext.java | 15 +--- .../apache/hadoop/hbase/ipc/ServerCall.java | 8 -- .../hadoop/hbase/ipc/AbstractTestIPC.java | 84 ------------------- .../ipc/TestRpcServerSlowConnectionSetup.java | 2 +- .../namequeues/TestNamedQueueRecorder.java | 4 - .../region/TestRegionProcedureStore.java | 4 - 13 files changed, 8 insertions(+), 164 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 123a67ef258..a57672f02ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.PoolMap; -import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; @@ -128,8 +126,6 @@ public abstract class AbstractRpcClient implements RpcC protected final int readTO; protected final int writeTO; - protected final UpstreamCaller upstreamCaller; - private final PoolMap connections; private final AtomicInteger callIdCnt = new AtomicInteger(0); @@ -190,15 +186,6 @@ public abstract class AbstractRpcClient implements RpcC } }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); - String className = conf.get(UpstreamCaller.HBASE_UPSTREAM_CALLER); - if (StringUtils.isEmpty(className)) { - LOG.info("No " + UpstreamCaller.HBASE_UPSTREAM_CALLER + " is set."); - this.upstreamCaller = UpstreamCaller.NONE; - } else { - this.upstreamCaller = ReflectionUtils.instantiateWithCustomCtor(className, - null, null); - } - if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO @@ -420,7 +407,7 @@ public abstract class AbstractRpcClient implements RpcC final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, - hrc.getCallTimeout(), hrc.getPriority(), upstreamCaller.getUpstreamCaller().orElse(null), new RpcCallback() { + hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @Override public void run(Call call) { counter.decrementAndGet(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index a8ad711b9f6..7793680ca54 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -55,18 +55,17 @@ class Call { final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. final int priority; - final String upstreamCaller; final MetricsConnection.CallStats callStats; private final RpcCallback callback; final Span span; Timeout timeoutTask; - protected Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, - String upstreamCaller, RpcCallback callback, MetricsConnection.CallStats callStats) { + protected Call(int id, final Descriptors.MethodDescriptor md, Message param, + final CellScanner cells, final Message responseDefaultType, int timeout, int priority, + RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; - this.upstreamCaller = upstreamCaller; this.callStats = callStats; this.callStats.setStartTime(EnvironmentEdgeManager.currentTime()); this.responseDefaultType = responseDefaultType; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index de1735c7b95..11d150e0878 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -126,11 +126,6 @@ class IPCUtil { if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } - - if (call.upstreamCaller != null) { - builder.setUpstreamCaller(call.upstreamCaller); - } - builder.setTimeout(call.timeout); return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java deleted file mode 100644 index 1f2b9332ac6..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/UpstreamCaller.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.hadoop.hbase.ipc; - -import java.util.Optional; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; - -@InterfaceAudience.LimitedPrivate("hubspot") -@InterfaceStability.Unstable -public interface UpstreamCaller { - - UpstreamCaller NONE = new UpstreamCaller() {}; - - String HBASE_UPSTREAM_CALLER = "hbase.upstream.caller.impl"; - - default Optional getUpstreamCaller() { - return Optional.empty(); - } -} diff --git a/hbase-protocol-shaded/src/main/protobuf/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/RPC.proto index e05f5523999..1ccf6e84ee3 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RPC.proto @@ -146,9 +146,6 @@ message RequestHeader { // See HConstants. optional uint32 priority = 6; optional uint32 timeout = 7; - - // Name of upstream caller, ie. grpc or rest caller - optional string upstream_caller = 100; } message ResponseHeader { diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index 57511ca906d..25e051430e2 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -127,9 +127,6 @@ message RequestHeader { // See HConstants. optional uint32 priority = 6; optional uint32 timeout = 7; - - // Name of upstream caller, ie. grpc or rest caller - optional string upstream_caller = 100; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index b206fcd3e2a..f5e12ddeca1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -110,9 +110,9 @@ public class CallRunner { this.status.setStatus("Setting up call"); this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { - Optional requestUserName = call.getRequestUserName(); + Optional remoteUser = call.getRequestUser(); RpcServer.LOG.trace(call.toShortString() + " executing as " + - (requestUserName.orElse("NULL principal"))); + (remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal")); } Throwable errorThrowable = null; String error = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index f65bb4a4ce0..6a4d3a29a52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -55,24 +55,11 @@ public interface RpcCallContext { */ Optional getRequestUser(); - /** - * When an HBase client is used as a proxy for connecting to HBase, the - * {@link #getRequestUser()} will be the name of the proxy. This will be - * the name of the client who called the proxy. - * @return The upstream caller for this call - */ - Optional getUpstreamCaller(); - /** * @return Current request's user name or not present if none ongoing. */ default Optional getRequestUserName() { - return getRequestUser() - .map(User::getShortName) - .map(userName -> getUpstreamCaller() - .map(upstreamCaller -> upstreamCaller + ".via." + userName) - .orElse(userName) - ); + return getRequestUser().map(User::getShortName); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 50dd0e1987e..9a4c11ad404 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -202,14 +202,6 @@ public abstract class ServerCall implements RpcCa return this.header.getPriority(); } - @Override - public Optional getUpstreamCaller() { - if (this.header.hasUpstreamCaller()) { - return Optional.of(this.header.getUpstreamCaller()); - } - return Optional.empty(); - } - /* * Short string representation without param info because param itself could be huge depends on * the payload of a command diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 3c166ba7e72..87561bac745 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -36,8 +35,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -51,7 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,86 +154,6 @@ public abstract class AbstractTestIPC { protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup( Configuration conf) throws IOException; - @Test - public void testDefaultUpstreamCallerPropagation() throws Exception { - UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1); - Configuration conf = HBaseConfiguration.create(); - RpcServer rpcServer = createRpcServer(null, "testRpcServer", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( - SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, - scheduler); - - try (AbstractRpcClient client = createRpcClient(conf)) { - rpcServer.start(); - BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()); - - RpcCall rpcCall = scheduler.getCall().get(); - - assertNotNull(rpcCall); - assertFalse(rpcCall.getUpstreamCaller().isPresent()); - assertTrue(rpcCall.getRequestUserName().isPresent()); - assertFalse(rpcCall.getRequestUserName().get().contains(".via.")); - } finally { - rpcServer.stop(); - } - } - - @Test - public void testCustomUpstreamCallerPropagation() throws Exception { - UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1); - Configuration conf = HBaseConfiguration.create(); - conf.set(UpstreamCaller.HBASE_UPSTREAM_CALLER, TestUpstreamCaller.class.getName()); - - RpcServer rpcServer = createRpcServer(null, "testRpcServer", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( - SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, - scheduler); - - try (AbstractRpcClient client = createRpcClient(conf)) { - rpcServer.start(); - BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()); - - RpcCall rpcCall = scheduler.getCall().get(); - - assertNotNull(rpcCall); - assertTrue(rpcCall.getUpstreamCaller().isPresent()); - assertEquals("test", rpcCall.getUpstreamCaller().get()); - assertTrue(rpcCall.getRequestUserName().isPresent()); - assertTrue(rpcCall.getRequestUserName().get().matches("test\\.via\\..+")); - } finally { - rpcServer.stop(); - } - } - - public static class UpstreamCallerExtractingRpcScheduler extends FifoRpcScheduler { - - private AtomicReference call = new AtomicReference<>(null); - - public UpstreamCallerExtractingRpcScheduler(Configuration conf, int handlerCount) { - super(conf, handlerCount); - } - - @Override - public boolean dispatch(CallRunner task) throws IOException, InterruptedException { - call.set(task.getRpcCall()); - return super.dispatch(task); - } - - public AtomicReference getCall() { - return call; - } - } - - public static class TestUpstreamCaller implements UpstreamCaller { - - @Override - public Optional getUpstreamCaller() { - return Optional.of("test"); - } - } - @Test public void testRTEDuringConnectionSetup() throws Exception { Configuration conf = HBaseConfiguration.create(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java index 1cd2c0c8c37..aedf57e72f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -124,7 +124,7 @@ public class TestRpcServerSlowConnectionSetup { int callId = 10; Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"), EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000, - HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats()); + HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats()); RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null); dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param)); requestHeader.writeDelimitedTo(dos); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 2740cc3ad6a..161bcc11a20 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -705,10 +705,6 @@ public class TestNamedQueueRecorder { return getUser(userName); } - @Override public Optional getUpstreamCaller() { - return Optional.empty(); - } - @Override public InetAddress getRemoteAddress() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index c29b0da12c8..d7a0ce76c9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -258,10 +258,6 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { return Optional.empty(); } - @Override public Optional getUpstreamCaller() { - return Optional.empty(); - } - @Override public InetAddress getRemoteAddress() { return null;