Revert "Upstream Callers (#15)"
See HBasePlanning/issues/806 for details
This reverts commit 8cc6bfb5b2
.
This commit is contained in:
parent
374ed22789
commit
0c21465878
|
@ -31,7 +31,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.PoolMap;
|
import org.apache.hadoop.hbase.util.PoolMap;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -128,8 +126,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
||||||
protected final int readTO;
|
protected final int readTO;
|
||||||
protected final int writeTO;
|
protected final int writeTO;
|
||||||
|
|
||||||
protected final UpstreamCaller upstreamCaller;
|
|
||||||
|
|
||||||
private final PoolMap<ConnectionId, T> connections;
|
private final PoolMap<ConnectionId, T> connections;
|
||||||
|
|
||||||
private final AtomicInteger callIdCnt = new AtomicInteger(0);
|
private final AtomicInteger callIdCnt = new AtomicInteger(0);
|
||||||
|
@ -190,15 +186,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
||||||
}
|
}
|
||||||
}, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
|
}, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
String className = conf.get(UpstreamCaller.HBASE_UPSTREAM_CALLER);
|
|
||||||
if (StringUtils.isEmpty(className)) {
|
|
||||||
LOG.info("No " + UpstreamCaller.HBASE_UPSTREAM_CALLER + " is set.");
|
|
||||||
this.upstreamCaller = UpstreamCaller.NONE;
|
|
||||||
} else {
|
|
||||||
this.upstreamCaller = ReflectionUtils.instantiateWithCustomCtor(className,
|
|
||||||
null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
|
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
|
||||||
+ this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
|
+ this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
|
||||||
|
@ -420,7 +407,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
||||||
|
|
||||||
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
|
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
|
||||||
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
|
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
|
||||||
hrc.getCallTimeout(), hrc.getPriority(), upstreamCaller.getUpstreamCaller().orElse(null), new RpcCallback<Call>() {
|
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
|
||||||
@Override
|
@Override
|
||||||
public void run(Call call) {
|
public void run(Call call) {
|
||||||
counter.decrementAndGet();
|
counter.decrementAndGet();
|
||||||
|
|
|
@ -55,18 +55,17 @@ class Call {
|
||||||
final Descriptors.MethodDescriptor md;
|
final Descriptors.MethodDescriptor md;
|
||||||
final int timeout; // timeout in millisecond for this call; 0 means infinite.
|
final int timeout; // timeout in millisecond for this call; 0 means infinite.
|
||||||
final int priority;
|
final int priority;
|
||||||
final String upstreamCaller;
|
|
||||||
final MetricsConnection.CallStats callStats;
|
final MetricsConnection.CallStats callStats;
|
||||||
private final RpcCallback<Call> callback;
|
private final RpcCallback<Call> callback;
|
||||||
final Span span;
|
final Span span;
|
||||||
Timeout timeoutTask;
|
Timeout timeoutTask;
|
||||||
|
|
||||||
protected Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
|
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
|
||||||
String upstreamCaller, RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
|
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
|
||||||
|
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
|
||||||
this.param = param;
|
this.param = param;
|
||||||
this.md = md;
|
this.md = md;
|
||||||
this.cells = cells;
|
this.cells = cells;
|
||||||
this.upstreamCaller = upstreamCaller;
|
|
||||||
this.callStats = callStats;
|
this.callStats = callStats;
|
||||||
this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
|
this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
|
||||||
this.responseDefaultType = responseDefaultType;
|
this.responseDefaultType = responseDefaultType;
|
||||||
|
|
|
@ -126,11 +126,6 @@ class IPCUtil {
|
||||||
if (call.priority != HConstants.PRIORITY_UNSET) {
|
if (call.priority != HConstants.PRIORITY_UNSET) {
|
||||||
builder.setPriority(call.priority);
|
builder.setPriority(call.priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (call.upstreamCaller != null) {
|
|
||||||
builder.setUpstreamCaller(call.upstreamCaller);
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.setTimeout(call.timeout);
|
builder.setTimeout(call.timeout);
|
||||||
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate("hubspot")
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public interface UpstreamCaller {
|
|
||||||
|
|
||||||
UpstreamCaller NONE = new UpstreamCaller() {};
|
|
||||||
|
|
||||||
String HBASE_UPSTREAM_CALLER = "hbase.upstream.caller.impl";
|
|
||||||
|
|
||||||
default Optional<String> getUpstreamCaller() {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -146,9 +146,6 @@ message RequestHeader {
|
||||||
// See HConstants.
|
// See HConstants.
|
||||||
optional uint32 priority = 6;
|
optional uint32 priority = 6;
|
||||||
optional uint32 timeout = 7;
|
optional uint32 timeout = 7;
|
||||||
|
|
||||||
// Name of upstream caller, ie. grpc or rest caller
|
|
||||||
optional string upstream_caller = 100;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ResponseHeader {
|
message ResponseHeader {
|
||||||
|
|
|
@ -127,9 +127,6 @@ message RequestHeader {
|
||||||
// See HConstants.
|
// See HConstants.
|
||||||
optional uint32 priority = 6;
|
optional uint32 priority = 6;
|
||||||
optional uint32 timeout = 7;
|
optional uint32 timeout = 7;
|
||||||
|
|
||||||
// Name of upstream caller, ie. grpc or rest caller
|
|
||||||
optional string upstream_caller = 100;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message ResponseHeader {
|
message ResponseHeader {
|
||||||
|
|
|
@ -110,9 +110,9 @@ public class CallRunner {
|
||||||
this.status.setStatus("Setting up call");
|
this.status.setStatus("Setting up call");
|
||||||
this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
|
this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
|
||||||
if (RpcServer.LOG.isTraceEnabled()) {
|
if (RpcServer.LOG.isTraceEnabled()) {
|
||||||
Optional<String> requestUserName = call.getRequestUserName();
|
Optional<User> remoteUser = call.getRequestUser();
|
||||||
RpcServer.LOG.trace(call.toShortString() + " executing as " +
|
RpcServer.LOG.trace(call.toShortString() + " executing as " +
|
||||||
(requestUserName.orElse("NULL principal")));
|
(remoteUser.isPresent() ? remoteUser.get().getName() : "NULL principal"));
|
||||||
}
|
}
|
||||||
Throwable errorThrowable = null;
|
Throwable errorThrowable = null;
|
||||||
String error = null;
|
String error = null;
|
||||||
|
|
|
@ -55,24 +55,11 @@ public interface RpcCallContext {
|
||||||
*/
|
*/
|
||||||
Optional<User> getRequestUser();
|
Optional<User> getRequestUser();
|
||||||
|
|
||||||
/**
|
|
||||||
* When an HBase client is used as a proxy for connecting to HBase, the
|
|
||||||
* {@link #getRequestUser()} will be the name of the proxy. This will be
|
|
||||||
* the name of the client who called the proxy.
|
|
||||||
* @return The upstream caller for this call
|
|
||||||
*/
|
|
||||||
Optional<String> getUpstreamCaller();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Current request's user name or not present if none ongoing.
|
* @return Current request's user name or not present if none ongoing.
|
||||||
*/
|
*/
|
||||||
default Optional<String> getRequestUserName() {
|
default Optional<String> getRequestUserName() {
|
||||||
return getRequestUser()
|
return getRequestUser().map(User::getShortName);
|
||||||
.map(User::getShortName)
|
|
||||||
.map(userName -> getUpstreamCaller()
|
|
||||||
.map(upstreamCaller -> upstreamCaller + ".via." + userName)
|
|
||||||
.orElse(userName)
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -202,14 +202,6 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
|
||||||
return this.header.getPriority();
|
return this.header.getPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<String> getUpstreamCaller() {
|
|
||||||
if (this.header.hasUpstreamCaller()) {
|
|
||||||
return Optional.of(this.header.getUpstreamCaller());
|
|
||||||
}
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Short string representation without param info because param itself could be huge depends on
|
* Short string representation without param info because param itself could be huge depends on
|
||||||
* the payload of a command
|
* the payload of a command
|
||||||
|
|
|
@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyObject;
|
import static org.mockito.ArgumentMatchers.anyObject;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -36,8 +35,6 @@ import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
@ -51,7 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -158,86 +154,6 @@ public abstract class AbstractTestIPC {
|
||||||
protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
|
protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
|
||||||
Configuration conf) throws IOException;
|
Configuration conf) throws IOException;
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDefaultUpstreamCallerPropagation() throws Exception {
|
|
||||||
UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1);
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
|
|
||||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
|
||||||
scheduler);
|
|
||||||
|
|
||||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
|
||||||
rpcServer.start();
|
|
||||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
|
||||||
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
|
|
||||||
|
|
||||||
RpcCall rpcCall = scheduler.getCall().get();
|
|
||||||
|
|
||||||
assertNotNull(rpcCall);
|
|
||||||
assertFalse(rpcCall.getUpstreamCaller().isPresent());
|
|
||||||
assertTrue(rpcCall.getRequestUserName().isPresent());
|
|
||||||
assertFalse(rpcCall.getRequestUserName().get().contains(".via."));
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCustomUpstreamCallerPropagation() throws Exception {
|
|
||||||
UpstreamCallerExtractingRpcScheduler scheduler = new UpstreamCallerExtractingRpcScheduler(CONF, 1);
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
conf.set(UpstreamCaller.HBASE_UPSTREAM_CALLER, TestUpstreamCaller.class.getName());
|
|
||||||
|
|
||||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
|
|
||||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
|
|
||||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
|
||||||
scheduler);
|
|
||||||
|
|
||||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
|
||||||
rpcServer.start();
|
|
||||||
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
|
|
||||||
stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build());
|
|
||||||
|
|
||||||
RpcCall rpcCall = scheduler.getCall().get();
|
|
||||||
|
|
||||||
assertNotNull(rpcCall);
|
|
||||||
assertTrue(rpcCall.getUpstreamCaller().isPresent());
|
|
||||||
assertEquals("test", rpcCall.getUpstreamCaller().get());
|
|
||||||
assertTrue(rpcCall.getRequestUserName().isPresent());
|
|
||||||
assertTrue(rpcCall.getRequestUserName().get().matches("test\\.via\\..+"));
|
|
||||||
} finally {
|
|
||||||
rpcServer.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class UpstreamCallerExtractingRpcScheduler extends FifoRpcScheduler {
|
|
||||||
|
|
||||||
private AtomicReference<RpcCall> call = new AtomicReference<>(null);
|
|
||||||
|
|
||||||
public UpstreamCallerExtractingRpcScheduler(Configuration conf, int handlerCount) {
|
|
||||||
super(conf, handlerCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
|
|
||||||
call.set(task.getRpcCall());
|
|
||||||
return super.dispatch(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicReference<RpcCall> getCall() {
|
|
||||||
return call;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TestUpstreamCaller implements UpstreamCaller {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<String> getUpstreamCaller() {
|
|
||||||
return Optional.of("test");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRTEDuringConnectionSetup() throws Exception {
|
public void testRTEDuringConnectionSetup() throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class TestRpcServerSlowConnectionSetup {
|
||||||
int callId = 10;
|
int callId = 10;
|
||||||
Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
|
Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
|
||||||
EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
|
EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000,
|
||||||
HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats());
|
HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
|
||||||
RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
|
RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
|
||||||
dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
|
dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param));
|
||||||
requestHeader.writeDelimitedTo(dos);
|
requestHeader.writeDelimitedTo(dos);
|
||||||
|
|
|
@ -705,10 +705,6 @@ public class TestNamedQueueRecorder {
|
||||||
return getUser(userName);
|
return getUser(userName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public Optional<String> getUpstreamCaller() {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetAddress getRemoteAddress() {
|
public InetAddress getRemoteAddress() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -258,10 +258,6 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public Optional<String> getUpstreamCaller() {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InetAddress getRemoteAddress() {
|
public InetAddress getRemoteAddress() {
|
||||||
return null;
|
return null;
|
||||||
|
|
Loading…
Reference in New Issue