HBASE-13397 Purge duplicate rpc request thread local
This commit is contained in:
parent
f561ef71a5
commit
6c22333599
|
@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -35,14 +34,13 @@ import com.google.protobuf.Message;
|
|||
/**
|
||||
* The request processing logic, which is usually executed in thread pools provided by an
|
||||
* {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
|
||||
* {@link RpcServer.Call}
|
||||
* RpcServer.Call
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CallRunner {
|
||||
private Call call;
|
||||
private RpcServerInterface rpcServer;
|
||||
private MonitoredRPCHandler status;
|
||||
private UserProvider userProvider;
|
||||
|
||||
/**
|
||||
* On construction, adds the size of this call to the running count of outstanding call sizes.
|
||||
|
@ -50,13 +48,12 @@ public class CallRunner {
|
|||
* time we occupy heap.
|
||||
*/
|
||||
// The constructor is shutdown so only RpcServer in this class can make one of these.
|
||||
CallRunner(final RpcServerInterface rpcServer, final Call call, UserProvider userProvider) {
|
||||
CallRunner(final RpcServerInterface rpcServer, final Call call) {
|
||||
this.call = call;
|
||||
this.rpcServer = rpcServer;
|
||||
// Add size of the call to queue size.
|
||||
this.rpcServer.addCallSize(call.getSize());
|
||||
this.status = getStatus();
|
||||
this.userProvider = userProvider;
|
||||
}
|
||||
|
||||
public Call getCall() {
|
||||
|
@ -70,7 +67,6 @@ public class CallRunner {
|
|||
this.call = null;
|
||||
this.rpcServer = null;
|
||||
this.status = null;
|
||||
this.userProvider = null;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
@ -101,8 +97,6 @@ public class CallRunner {
|
|||
if (call.tinfo != null) {
|
||||
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
|
||||
}
|
||||
RequestContext.set(userProvider.create(call.connection.user), RpcServer.getRemoteIp(),
|
||||
call.connection.service);
|
||||
// make the call
|
||||
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
|
||||
call.timestamp, this.status);
|
||||
|
@ -117,11 +111,8 @@ public class CallRunner {
|
|||
if (traceScope != null) {
|
||||
traceScope.close();
|
||||
}
|
||||
// Must always clear the request context to avoid leaking
|
||||
// credentials between requests.
|
||||
RequestContext.clear();
|
||||
RpcServer.CurCall.set(null);
|
||||
}
|
||||
RpcServer.CurCall.set(null);
|
||||
// Set the response for undelayed calls and delayed calls with
|
||||
// undelayed responses.
|
||||
if (!call.isDelayed() || !call.isReturnValueDelayed()) {
|
||||
|
|
|
@ -1,153 +0,0 @@
|
|||
/*
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.htrace.Trace;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
/**
|
||||
* Represents client information (authenticated username, remote address, protocol)
|
||||
* for the currently executing request. If called outside the context of a RPC request, all values
|
||||
* will be <code>null</code>. The {@link CallRunner} class before it a call and then on
|
||||
* its way out, it will clear the thread local.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RequestContext {
|
||||
private static ThreadLocal<RequestContext> instance =
|
||||
new ThreadLocal<RequestContext>() {
|
||||
protected RequestContext initialValue() {
|
||||
return new RequestContext(null, null, null);
|
||||
}
|
||||
};
|
||||
|
||||
public static RequestContext get() {
|
||||
return instance.get();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the user credentials associated with the current RPC request or
|
||||
* <code>null</code> if no credentials were provided.
|
||||
* @return A User
|
||||
*/
|
||||
public static User getRequestUser() {
|
||||
RequestContext ctx = instance.get();
|
||||
if (ctx != null) {
|
||||
return ctx.getUser();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the username for any user associated with the current RPC
|
||||
* request or <code>null</code> if no user is set.
|
||||
*/
|
||||
public static String getRequestUserName() {
|
||||
User user = getRequestUser();
|
||||
if (user != null) {
|
||||
return user.getShortName();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether or not the current thread is within scope of executing
|
||||
* an RPC request.
|
||||
*/
|
||||
public static boolean isInRequestContext() {
|
||||
RequestContext ctx = instance.get();
|
||||
if (ctx != null) {
|
||||
return ctx.isInRequest();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the client credentials for the current request.
|
||||
* @param user
|
||||
* @param remoteAddress
|
||||
* @param service
|
||||
*/
|
||||
public static void set(User user,
|
||||
InetAddress remoteAddress, BlockingService service) {
|
||||
RequestContext ctx = instance.get();
|
||||
ctx.user = user;
|
||||
ctx.remoteAddress = remoteAddress;
|
||||
ctx.service = service;
|
||||
ctx.inRequest = true;
|
||||
if (Trace.isTracing()) {
|
||||
if (user != null) {
|
||||
Trace.currentSpan().addKVAnnotation(Bytes.toBytes("user"), Bytes.toBytes(user.getName()));
|
||||
}
|
||||
if (remoteAddress != null) {
|
||||
Trace.currentSpan().addKVAnnotation(
|
||||
Bytes.toBytes("remoteAddress"),
|
||||
Bytes.toBytes(remoteAddress.getHostAddress()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears out the client credentials for a given request.
|
||||
*/
|
||||
public static void clear() {
|
||||
RequestContext ctx = instance.get();
|
||||
ctx.user = null;
|
||||
ctx.remoteAddress = null;
|
||||
ctx.service = null;
|
||||
ctx.inRequest = false;
|
||||
}
|
||||
|
||||
private User user;
|
||||
private InetAddress remoteAddress;
|
||||
private BlockingService service;
|
||||
// indicates we're within a RPC request invocation
|
||||
private boolean inRequest;
|
||||
|
||||
private RequestContext(User user, InetAddress remoteAddr, BlockingService service) {
|
||||
this.user = user;
|
||||
this.remoteAddress = remoteAddr;
|
||||
this.service = service;
|
||||
}
|
||||
|
||||
public User getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public InetAddress getRemoteAddress() {
|
||||
return remoteAddress;
|
||||
}
|
||||
|
||||
public BlockingService getService() {
|
||||
return this.service;
|
||||
}
|
||||
|
||||
boolean isInRequest() {
|
||||
return inRequest;
|
||||
}
|
||||
}
|
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
|
||||
public interface RpcCallContext extends Delayable {
|
||||
/**
|
||||
|
@ -36,4 +40,21 @@ public interface RpcCallContext extends Delayable {
|
|||
* @return True if the client supports cellblocks, else return all content in pb
|
||||
*/
|
||||
boolean isClientCellBlockSupport();
|
||||
|
||||
/**
|
||||
* Returns the user credentials associated with the current RPC request or
|
||||
* <code>null</code> if no credentials were provided.
|
||||
* @return A User
|
||||
*/
|
||||
User getRequestUser();
|
||||
|
||||
/**
|
||||
* @return Current request's user name or null if none ongoing.
|
||||
*/
|
||||
String getRequestUserName();
|
||||
|
||||
/**
|
||||
* @return Address of remote client if a request is ongoing, else null
|
||||
*/
|
||||
InetAddress getRemoteAddress();
|
||||
}
|
||||
|
|
|
@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
|
|||
import org.apache.hadoop.hbase.security.AuthMethod;
|
||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
|
||||
import org.apache.hadoop.hbase.security.SaslStatus;
|
||||
|
@ -299,9 +300,12 @@ public class RpcServer implements RpcServerInterface {
|
|||
protected TraceInfo tinfo;
|
||||
private ByteBuffer cellBlock = null;
|
||||
|
||||
private User user;
|
||||
private InetAddress remoteAddress;
|
||||
|
||||
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
|
||||
Message param, CellScanner cellScanner, Connection connection, Responder responder,
|
||||
long size, TraceInfo tinfo) {
|
||||
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
|
||||
this.id = id;
|
||||
this.service = service;
|
||||
this.md = md;
|
||||
|
@ -316,6 +320,8 @@ public class RpcServer implements RpcServerInterface {
|
|||
this.isError = false;
|
||||
this.size = size;
|
||||
this.tinfo = tinfo;
|
||||
this.user = connection.user == null? null: userProvider.create(connection.user);
|
||||
this.remoteAddress = remoteAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -522,6 +528,22 @@ public class RpcServer implements RpcServerInterface {
|
|||
public UserGroupInformation getRemoteUser() {
|
||||
return connection.user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public User getRequestUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRequestUserName() {
|
||||
User user = getRequestUser();
|
||||
return user == null? null: user.getShortName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetAddress getRemoteAddress() {
|
||||
return remoteAddress;
|
||||
}
|
||||
}
|
||||
|
||||
/** Listens on the socket. Creates jobs for the handler threads*/
|
||||
|
@ -1196,13 +1218,14 @@ public class RpcServer implements RpcServerInterface {
|
|||
// Fake 'call' for failed authorization response
|
||||
private static final int AUTHORIZATION_FAILED_CALLID = -1;
|
||||
private final Call authFailedCall =
|
||||
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null);
|
||||
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
|
||||
null);
|
||||
private ByteArrayOutputStream authFailedResponse =
|
||||
new ByteArrayOutputStream();
|
||||
// Fake 'call' for SASL context setup
|
||||
private static final int SASL_CALLID = -33;
|
||||
private final Call saslCall =
|
||||
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
|
||||
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
|
||||
|
||||
public UserGroupInformation attemptingUser = null; // user name before auth
|
||||
|
||||
|
@ -1590,7 +1613,7 @@ public class RpcServer implements RpcServerInterface {
|
|||
|
||||
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
|
||||
LOG.warn(msg);
|
||||
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
|
||||
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
|
||||
setupResponse(null, fakeCall, e, msg);
|
||||
responder.doRespond(fakeCall);
|
||||
// Returning -1 closes out the connection.
|
||||
|
@ -1749,7 +1772,7 @@ public class RpcServer implements RpcServerInterface {
|
|||
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
|
||||
final Call callTooBig =
|
||||
new Call(id, this.service, null, null, null, null, this,
|
||||
responder, totalRequestSize, null);
|
||||
responder, totalRequestSize, null, null);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
|
||||
"Call queue is full on " + getListenerAddress() +
|
||||
|
@ -1794,7 +1817,7 @@ public class RpcServer implements RpcServerInterface {
|
|||
|
||||
final Call readParamsFailedCall =
|
||||
new Call(id, this.service, null, null, null, null, this,
|
||||
responder, totalRequestSize, null);
|
||||
responder, totalRequestSize, null, null);
|
||||
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
||||
setupResponse(responseBuffer, readParamsFailedCall, t,
|
||||
msg + "; " + t.getMessage());
|
||||
|
@ -1806,9 +1829,8 @@ public class RpcServer implements RpcServerInterface {
|
|||
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
|
||||
: null;
|
||||
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
|
||||
totalRequestSize,
|
||||
traceInfo);
|
||||
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
|
||||
totalRequestSize, traceInfo, RpcServer.getRemoteIp());
|
||||
scheduler.dispatch(new CallRunner(RpcServer.this, call));
|
||||
}
|
||||
|
||||
private boolean authorizeConnection() throws IOException {
|
||||
|
@ -2345,6 +2367,33 @@ public class RpcServer implements RpcServerInterface {
|
|||
return CurCall.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the user credentials associated with the current RPC request or
|
||||
* <code>null</code> if no credentials were provided.
|
||||
* @return A User
|
||||
*/
|
||||
public static User getRequestUser() {
|
||||
RpcCallContext ctx = getCurrentCall();
|
||||
return ctx == null? null: ctx.getRequestUser();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the username for any user associated with the current RPC
|
||||
* request or <code>null</code> if no user is set.
|
||||
*/
|
||||
public static String getRequestUserName() {
|
||||
User user = getRequestUser();
|
||||
return user == null? null: user.getShortName();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Address of remote client if a request is ongoing, else null
|
||||
*/
|
||||
public static InetAddress getRemoteAddress() {
|
||||
RpcCallContext ctx = getCurrentCall();
|
||||
return ctx == null? null: ctx.getRemoteAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serviceName Some arbitrary string that represents a 'service'.
|
||||
* @param services Available service instances
|
||||
|
|
|
@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.client.TableState;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
|
||||
|
@ -1192,8 +1191,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
|||
* @return Client info for use as prefix on an audit log string; who did an action
|
||||
*/
|
||||
String getClientIdAuditPrefix() {
|
||||
return "Client=" + RequestContext.getRequestUserName() + "/" +
|
||||
RequestContext.get().getRemoteAddress();
|
||||
return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
|||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
|
@ -104,9 +104,8 @@ public class CreateTableHandler extends EventHandler {
|
|||
// If we are creating the table in service to an RPC request, record the
|
||||
// active user for later, so proper permissions will be applied to the
|
||||
// new table by the AccessController if it is active
|
||||
if (RequestContext.isInRequestContext()) {
|
||||
this.activeUser = RequestContext.getRequestUser();
|
||||
} else {
|
||||
this.activeUser = RpcServer.getRequestUser();
|
||||
if (this.activeUser == null) {
|
||||
this.activeUser = UserProvider.instantiate(conf).getCurrent();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
|
@ -567,7 +567,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
if (!snapshot.hasVersion()) {
|
||||
builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
|
||||
}
|
||||
User user = RequestContext.getRequestUser();
|
||||
User user = RpcServer.getRequestUser();
|
||||
if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
|
||||
builder.setOwner(user.getShortName());
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
|
@ -176,9 +176,10 @@ public class RegionServerQuotaManager {
|
|||
private OperationQuota checkQuota(final Region region,
|
||||
final int numWrites, final int numReads, final int numScans)
|
||||
throws IOException, ThrottlingException {
|
||||
User user = RpcServer.getRequestUser();
|
||||
UserGroupInformation ugi;
|
||||
if (RequestContext.isInRequestContext()) {
|
||||
ugi = RequestContext.getRequestUser().getUGI();
|
||||
if (user != null) {
|
||||
ugi = user.getUGI();
|
||||
} else {
|
||||
ugi = User.getCurrent().getUGI();
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter;
|
|||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
@ -372,11 +372,7 @@ public class AccessController extends BaseMasterAndRegionObserver
|
|||
|
||||
private void logResult(AuthResult result) {
|
||||
if (AUDITLOG.isTraceEnabled()) {
|
||||
RequestContext ctx = RequestContext.get();
|
||||
InetAddress remoteAddr = null;
|
||||
if (ctx != null) {
|
||||
remoteAddr = ctx.getRemoteAddress();
|
||||
}
|
||||
InetAddress remoteAddr = RpcServer.getRemoteAddress();
|
||||
AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
|
||||
" for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
|
||||
"; reason: " + result.getReason() +
|
||||
|
@ -392,8 +388,8 @@ public class AccessController extends BaseMasterAndRegionObserver
|
|||
* otherwise the currently logged in user is used.
|
||||
*/
|
||||
private User getActiveUser() throws IOException {
|
||||
User user = RequestContext.getRequestUser();
|
||||
if (!RequestContext.isInRequestContext()) {
|
||||
User user = RpcServer.getRequestUser();
|
||||
if (user == null) {
|
||||
// for non-rpc handling, fallback to system user
|
||||
user = userProvider.getCurrent();
|
||||
}
|
||||
|
@ -2024,14 +2020,11 @@ public class AccessController extends BaseMasterAndRegionObserver
|
|||
* If so, we assume that access control is correctly enforced based on
|
||||
* the checks performed in preScannerOpen()
|
||||
*/
|
||||
private void requireScannerOwner(InternalScanner s)
|
||||
throws AccessDeniedException {
|
||||
if (RequestContext.isInRequestContext()) {
|
||||
String requestUserName = RequestContext.getRequestUserName();
|
||||
String owner = scannerOwners.get(s);
|
||||
if (owner != null && !owner.equals(requestUserName)) {
|
||||
throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
|
||||
}
|
||||
private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
|
||||
String requestUserName = RpcServer.getRequestUserName();
|
||||
String owner = scannerOwners.get(s);
|
||||
if (owner != null && !owner.equals(requestUserName)) {
|
||||
throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
|
@ -336,8 +336,8 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
}
|
||||
|
||||
private User getActiveUser() {
|
||||
User user = RequestContext.getRequestUser();
|
||||
if (!RequestContext.isInRequestContext()) {
|
||||
User user = RpcServer.getRequestUser();
|
||||
if (user == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
|||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
|
@ -111,7 +110,7 @@ public class TokenProvider implements AuthenticationProtos.AuthenticationService
|
|||
"No secret manager configured for token authentication");
|
||||
}
|
||||
|
||||
User currentUser = RequestContext.getRequestUser();
|
||||
User currentUser = RpcServer.getRequestUser();
|
||||
UserGroupInformation ugi = null;
|
||||
if (currentUser != null) {
|
||||
ugi = currentUser.getUGI();
|
||||
|
@ -137,7 +136,7 @@ public class TokenProvider implements AuthenticationProtos.AuthenticationService
|
|||
@Override
|
||||
public void whoAmI(RpcController controller, AuthenticationProtos.WhoAmIRequest request,
|
||||
RpcCallback<AuthenticationProtos.WhoAmIResponse> done) {
|
||||
User requestUser = RequestContext.getRequestUser();
|
||||
User requestUser = RpcServer.getRequestUser();
|
||||
AuthenticationProtos.WhoAmIResponse.Builder response =
|
||||
AuthenticationProtos.WhoAmIResponse.newBuilder();
|
||||
if (requestUser != null) {
|
||||
|
|
|
@ -73,7 +73,7 @@ import org.apache.hadoop.hbase.filter.Filter;
|
|||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
|
||||
|
@ -596,12 +596,11 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
|
|||
* access control is correctly enforced based on the checks performed in preScannerOpen()
|
||||
*/
|
||||
private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
|
||||
if (RequestContext.isInRequestContext()) {
|
||||
String requestUName = RequestContext.getRequestUserName();
|
||||
String owner = scannerOwners.get(s);
|
||||
if (owner != null && !owner.equals(requestUName)) {
|
||||
throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
|
||||
}
|
||||
// This is duplicated code!
|
||||
String requestUName = RpcServer.getRequestUserName();
|
||||
String owner = scannerOwners.get(s);
|
||||
if (owner != null && !owner.equals(requestUName)) {
|
||||
throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -825,12 +824,8 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
|
|||
private void logResult(boolean isAllowed, String request, String reason, byte[] user,
|
||||
List<byte[]> labelAuths, String regex) {
|
||||
if (AUDITLOG.isTraceEnabled()) {
|
||||
RequestContext ctx = RequestContext.get();
|
||||
InetAddress remoteAddr = null;
|
||||
if (ctx != null) {
|
||||
remoteAddr = ctx.getRemoteAddress();
|
||||
}
|
||||
|
||||
// This is more duplicated code!
|
||||
InetAddress remoteAddr = RpcServer.getRemoteAddress();
|
||||
List<String> labelAuthsStr = new ArrayList<>();
|
||||
if (labelAuths != null) {
|
||||
int labelAuthsSize = labelAuths.size();
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations;
|
||||
|
@ -326,8 +326,8 @@ public class VisibilityUtils {
|
|||
* @throws IOException When there is IOE in getting the system user (During non-RPC handling).
|
||||
*/
|
||||
public static User getActiveUser() throws IOException {
|
||||
User user = RequestContext.getRequestUser();
|
||||
if (!RequestContext.isInRequestContext()) {
|
||||
User user = RpcServer.getRequestUser();
|
||||
if (user == null) {
|
||||
user = User.getCurrent();
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
|
|
@ -35,7 +35,7 @@ public class TestCallRunner {
|
|||
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
|
||||
RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
|
||||
mockCall.connection = Mockito.mock(RpcServer.Connection.class);
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall, new UserProvider());
|
||||
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
|
||||
cr.run();
|
||||
}
|
||||
}
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
|
|||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
|
||||
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RequestContext;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
|
@ -295,7 +294,7 @@ public class TestTokenAuthentication {
|
|||
public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
|
||||
RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
|
||||
throws ServiceException {
|
||||
LOG.debug("Authentication token request from "+RequestContext.getRequestUserName());
|
||||
LOG.debug("Authentication token request from " + RpcServer.getRequestUserName());
|
||||
// ignore passed in controller -- it's always null
|
||||
ServerRpcController serverController = new ServerRpcController();
|
||||
BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
|
||||
|
@ -313,7 +312,7 @@ public class TestTokenAuthentication {
|
|||
public AuthenticationProtos.WhoAmIResponse whoAmI(
|
||||
RpcController controller, AuthenticationProtos.WhoAmIRequest request)
|
||||
throws ServiceException {
|
||||
LOG.debug("whoAmI() request from "+RequestContext.getRequestUserName());
|
||||
LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName());
|
||||
// ignore passed in controller -- it's always null
|
||||
ServerRpcController serverController = new ServerRpcController();
|
||||
BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
|
||||
|
|
Loading…
Reference in New Issue