Upstream Callers (#15)

Adds an upstream caller to the RequestHeader of hbase requests, and makes that available in RpcCallContext
One can inject an upstream caller by setting the conf `hbase.upstream.caller.impl` to a fully qualified class name of a class with a public constructor that implements `org.apache.hadoop.hbase.ipc.UpstreamCaller`
Upstream callers will be available via `RpcCallContext#getUpstreamCaller` or `RpcCallContext#getRequestUserName`. The latter combines the real username and upstream caller to create a fully qualified name like `<upstreamCaller>.via.<user>`
This commit is contained in:
Bryan Beaudreault 2021-08-03 12:35:50 -04:00
parent c7e4262990
commit 8cc6bfb5b2
13 changed files with 164 additions and 8 deletions

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
@ -126,6 +128,8 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final int readTO;
protected final int writeTO;
protected final UpstreamCaller upstreamCaller;
private final PoolMap<ConnectionId, T> connections;
private final AtomicInteger callIdCnt = new AtomicInteger(0);
@ -186,6 +190,15 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
}, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
String className = conf.get(UpstreamCaller.HBASE_UPSTREAM_CALLER);
if (StringUtils.isEmpty(className)) {
LOG.info("No " + UpstreamCaller.HBASE_UPSTREAM_CALLER + " is set.");
this.upstreamCaller = UpstreamCaller.NONE;
} else {
this.upstreamCaller = ReflectionUtils.instantiateWithCustomCtor(className,
null, null);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
+ this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
@ -407,7 +420,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
hrc.getCallTimeout(), hrc.getPriority(), upstreamCaller.getUpstreamCaller().orElse(null), new RpcCallback<Call>() {
@Override
public void run(Call call) {
counter.decrementAndGet();

View File

@ -55,17 +55,18 @@ class Call {
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
final int priority;
final String upstreamCaller;
final MetricsConnection.CallStats callStats;
private final RpcCallback<Call> callback;
final Span span;
Timeout timeoutTask;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
protected Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
String upstreamCaller, RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
this.md = md;
this.cells = cells;
this.upstreamCaller = upstreamCaller;
this.callStats = callStats;
this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
this.responseDefaultType = responseDefaultType;

View File

@ -126,6 +126,11 @@ class IPCUtil {
if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
if (call.upstreamCaller != null) {
builder.setUpstreamCaller(call.upstreamCaller);
}
builder.setTimeout(call.timeout);
return builder.build();

View File

@ -0,0 +1,18 @@
package org.apache.hadoop.hbase.ipc;
import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.LimitedPrivate("hubspot")
@InterfaceStability.Unstable
public interface UpstreamCaller {
UpstreamCaller NONE = new UpstreamCaller() {};
String HBASE_UPSTREAM_CALLER = "hbase.upstream.caller.impl";
default Optional<String> getUpstreamCaller() {
return Optional.empty();
}
}

View File

@ -146,6 +146,9 @@ message RequestHeader {
// See HConstants.
optional uint32 priority = 6;
optional uint32 timeout = 7;
// Name of upstream caller, ie. grpc or rest caller
optional string upstream_caller = 100;
}
message ResponseHeader {

View File

@ -127,6 +127,9 @@ message RequestHeader {
// See HConstants.
optional uint32 priority = 6;
optional uint32 timeout = 7;
// Name of upstream caller, ie. grpc or rest caller
optional string upstream_caller = 100;
}
message ResponseHeader {

View File

@ -109,9 +109,9 @@ public class CallRunner {
this.status.setStatus("Setting up call");
this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) {
Optional<User> remoteUser = call.getRequestUser();
Optional<String> requestUserName = call.getRequestUserName();
RpcServer.LOG.trace(call.toShortString() + " executing as " +
(remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal"));
(requestUserName.orElse("NULL principal")));
}
Throwable errorThrowable = null;
String error = null;

View File

@ -55,11 +55,24 @@ public interface RpcCallContext {
*/
Optional<User> getRequestUser();
/**
* When an HBase client is used as a proxy for connecting to HBase, the
* {@link #getRequestUser()} will be the name of the proxy. This will be
* the name of the client who called the proxy.
* @return The upstream caller for this call
*/
Optional<String> getUpstreamCaller();
/**
* @return Current request's user name or not present if none ongoing.
*/
default Optional<String> getRequestUserName() {
return getRequestUser().map(User::getShortName);
return getRequestUser()
.map(User::getShortName)
.map(userName -> getUpstreamCaller()
.map(upstreamCaller -> upstreamCaller + ".via." + userName)
.orElse(userName)
);
}
/**

View File

@ -202,6 +202,14 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return this.header.getPriority();
}
@Override
public Optional<String> getUpstreamCaller() {
if (this.header.hasUpstreamCaller()) {
return Optional.of(this.header.getUpstreamCaller());
}
return Optional.empty();
}
/*
* Short string representation without param info because param itself could be huge depends on
* the payload of a command

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ -35,6 +36,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -154,6 +158,86 @@ public abstract class AbstractTestIPC {
protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
Configuration conf) throws IOException;
@Test
public void testDefaultUpstreamCallerPropagation() throws Exception {
UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1);
Configuration conf = HBaseConfiguration.create();
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
scheduler);
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
RpcCall rpcCall = scheduler.getCall().get();
assertNotNull(rpcCall);
assertFalse(rpcCall.getUpstreamCaller().isPresent());
assertTrue(rpcCall.getRequestUserName().isPresent());
assertFalse(rpcCall.getRequestUserName().get().contains(".via."));
} finally {
rpcServer.stop();
}
}
@Test
public void testCustomUpstreamCallerPropagation() throws Exception {
UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1);
Configuration conf = HBaseConfiguration.create();
conf.set(UpstreamCaller.HBASE_UPSTREAM_CALLER, TestUpstreamCaller.class.getName());
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
scheduler);
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
RpcCall rpcCall = scheduler.getCall().get();
assertNotNull(rpcCall);
assertTrue(rpcCall.getUpstreamCaller().isPresent());
assertEquals("test", rpcCall.getUpstreamCaller().get());
assertTrue(rpcCall.getRequestUserName().isPresent());
assertTrue(rpcCall.getRequestUserName().get().matches("test\\.via\\..+"));
} finally {
rpcServer.stop();
}
}
public static class UpstreamCallerExtractingRpcScheduler extends FifoRpcScheduler {
private AtomicReference<RpcCall> call = new AtomicReference<>(null);
public UpstreamCallerExtractingRpcScheduler(Configuration conf, int handlerCount) {
super(conf, handlerCount);
}
@Override
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
call.set(task.getRpcCall());
return super.dispatch(task);
}
public AtomicReference<RpcCall> getCall() {
return call;
}
}
public static class TestUpstreamCaller implements UpstreamCaller {
@Override
public Optional<String> getUpstreamCaller() {
return Optional.of("test");
}
}
@Test
public void testRTEDuringConnectionSetup() throws Exception {
Configuration conf = HBaseConfiguration.create();

View File

@ -124,7 +124,7 @@ public class TestRpcServerSlowConnectionSetup {
int callId = 10;
Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats());
RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
requestHeader.writeDelimitedTo(dos);

View File

@ -705,6 +705,10 @@ public class TestNamedQueueRecorder {
return getUser(userName);
}
@Override public Optional<String> getUpstreamCaller() {
return Optional.empty();
}
@Override
public InetAddress getRemoteAddress() {
return null;

View File

@ -258,6 +258,10 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
return Optional.empty();
}
@Override public Optional<String> getUpstreamCaller() {
return Optional.empty();
}
@Override
public InetAddress getRemoteAddress() {
return null;