HBASE-9221: Provide interface for getting a User in the client

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536937 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jyates 2013-10-30 00:02:52 +00:00
parent b73e8b3b5a
commit e5e5dac9a7
28 changed files with 284 additions and 73 deletions

View File

@ -226,6 +226,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>
</dependencies>
</profile>

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
/**
* Denotes a unique key to an {@link HConnection} instance.
@ -64,7 +65,8 @@ class HConnectionKey {
this.properties = Collections.unmodifiableMap(m);
try {
User currentUser = User.getCurrent();
UserProvider provider = UserProvider.instantiate(conf);
User currentUser = provider.getCurrent();
if (currentUser != null) {
username = currentUser.getName();
}

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
@ -277,7 +278,8 @@ public class HConnectionManager {
*/
public static HConnection createConnection(Configuration conf)
throws IOException {
return createConnection(conf, false, null, User.getCurrent());
UserProvider provider = UserProvider.instantiate(conf);
return createConnection(conf, false, null, provider.getCurrent());
}
/**
@ -302,7 +304,8 @@ public class HConnectionManager {
*/
public static HConnection createConnection(Configuration conf, ExecutorService pool)
throws IOException {
return createConnection(conf, false, pool, User.getCurrent());
UserProvider provider = UserProvider.instantiate(conf);
return createConnection(conf, false, pool, provider.getCurrent());
}
/**
@ -359,7 +362,8 @@ public class HConnectionManager {
@Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed)
throws IOException {
return createConnection(conf, managed, null, User.getCurrent());
UserProvider provider = UserProvider.instantiate(conf);
return createConnection(conf, managed, null, provider.getCurrent());
}
@Deprecated

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@ -132,6 +133,7 @@ public class RpcClient {
protected final SocketAddress localAddr;
private final boolean fallbackAllowed;
private UserProvider userProvider;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
@ -385,7 +387,7 @@ public class RpcClient {
UserGroupInformation ticket = remoteId.getTicket().getUGI();
SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
this.useSasl = User.isHBaseSecurityEnabled(conf);
this.useSasl = userProvider.isHBaseSecurityEnabled();
if (useSasl && securityInfo != null) {
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
if (tokenKind != null) {
@ -1258,6 +1260,8 @@ public class RpcClient {
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.localAddr = localAddr;
this.userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
", tcpKeepAlive=" + this.tcpKeepAlive +
@ -1406,8 +1410,8 @@ public class RpcClient {
* @param addr
* @param returnType
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection
* each time.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
@ -1614,8 +1618,8 @@ public class RpcClient {
rpcTimeout.remove();
}
/** Make a blocking call.
* Throws exceptions if there are network problems or if the remote code
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @param md
* @param controller
@ -1623,8 +1627,8 @@ public class RpcClient {
* @param returnType
* @param isa
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link User#getCurrent()} makes a new instance of User each time so will be a new Connection
* each time.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException

View File

@ -285,6 +285,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.BaseConfigurable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Provide an instance of a user. Allows custom {@link User} creation.
*/
public class UserProvider extends BaseConfigurable {
private static final String USER_PROVIDER_CONF_KEY = "hbase.client.userprovider.class";
/**
* Instantiate the {@link UserProvider} specified in the configuration and set the passed
* configuration via {@link UserProvider#setConf(Configuration)}
* @param conf to read and set on the created {@link UserProvider}
* @return a {@link UserProvider} ready for use.
*/
public static UserProvider instantiate(Configuration conf) {
Class<? extends UserProvider> clazz =
conf.getClass(USER_PROVIDER_CONF_KEY, UserProvider.class, UserProvider.class);
return ReflectionUtils.newInstance(clazz, conf);
}
/**
* Set the {@link UserProvider} in the given configuration that should be instantiated
* @param conf to update
* @param provider class of the provider to set
*/
public static void setUserProviderForTesting(Configuration conf,
Class<? extends UserProvider> provider) {
conf.set(USER_PROVIDER_CONF_KEY, provider.getName());
}
/**
* @return the userName for the current logged-in user.
* @throws IOException if the underlying user cannot be obtained
*/
public String getCurrentUserName() throws IOException {
User user = getCurrent();
return user == null ? null : user.getName();
}
/**
* @return <tt>true</tt> if security is enabled, <tt>false</tt> otherwise
*/
public boolean isHBaseSecurityEnabled() {
return User.isHBaseSecurityEnabled(this.getConf());
}
/**
* @return whether or not Kerberos authentication is configured for Hadoop. For non-secure Hadoop,
* this always returns <code>false</code>. For secure Hadoop, it will return the value
* from {@code UserGroupInformation.isSecurityEnabled()}.
*/
public boolean isHadoopSecurityEnabled() {
return User.isSecurityEnabled();
}
/**
* @return the current user within the current execution context
* @throws IOException if the user cannot be loaded
*/
public User getCurrent() throws IOException {
return User.getCurrent();
}
/**
* Wraps an underlying {@code UserGroupInformation} instance.
* @param ugi The base Hadoop user
* @return User
*/
public User create(UserGroupInformation ugi) {
return User.create(ugi);
}
/**
* Log in the current process using the given configuration keys for the credential file and login
* principal.
* <p>
* <strong>This is only applicable when running on secure Hadoop</strong> -- see
* org.apache.hadoop.security.SecurityUtil#login(Configuration,String,String,String). On regular
* Hadoop (without security features), this will safely be ignored.
* </p>
* @param conf The configuration data to use
* @param fileConfKey Property key used to configure path to the credential file
* @param principalConfKey Property key used to configure login principal
* @param localhost Current hostname to use in any credentials
* @throws IOException underlying exception from SecurityUtil.login() call
*/
public void login(String fileConfKey, String principalConfKey, String localhost)
throws IOException {
User.login(getConf(), fileConfKey, principalConfKey, localhost);
}
}

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@ -42,6 +42,7 @@ public class CallRunner {
private final Call call;
private final RpcServerInterface rpcServer;
private final MonitoredRPCHandler status;
private UserProvider userProvider;
/**
* On construction, adds the size of this call to the running count of outstanding call sizes.
@ -51,12 +52,13 @@ public class CallRunner {
* @param rpcServer
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
CallRunner(final RpcServerInterface rpcServer, final Call call) {
CallRunner(final RpcServerInterface rpcServer, final Call call, UserProvider userProvider) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
this.rpcServer.addCallSize(call.getSize());
this.status = getStatus();
this.userProvider = userProvider;
}
public Call getCall() {
@ -84,7 +86,7 @@ public class CallRunner {
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
}
RequestContext.set(User.create(call.connection.user), RpcServer.getRemoteIp(),
RequestContext.set(userProvider.create(call.connection.user), RpcServer.getRemoteIp(),
call.connection.service);
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
@ -258,6 +259,8 @@ public class RpcServer implements RpcServerInterface {
private final RpcScheduler scheduler;
private UserProvider userProvider;
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
@ -1704,7 +1707,7 @@ public class RpcServer implements RpcServerInterface {
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize,
traceInfo);
scheduler.dispatch(new CallRunner(RpcServer.this, call));
scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
}
private boolean authorizeConnection() throws IOException {
@ -1842,7 +1845,8 @@ public class RpcServer implements RpcServerInterface {
// Create the responder here
responder = new Responder();
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.isSecurityEnabled = User.isHBaseSecurityEnabled(this.conf);
this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
if (isSecurityEnabled) {
HBaseSaslRpcServer.init(conf);
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputFormat;
@ -176,9 +177,11 @@ public class TableMapReduceUtil {
}
public static void initCredentials(JobConf job) throws IOException {
if (User.isHBaseSecurityEnabled(job)) {
UserProvider userProvider = UserProvider.instantiate(job);
// login the server principal (if using secure Hadoop)
if (userProvider.isHBaseSecurityEnabled()) {
try {
User.getCurrent().obtainAuthTokenForJob(job);
userProvider.getCurrent().obtainAuthTokenForJob(job);
} catch (InterruptedException ie) {
ie.printStackTrace();
Thread.interrupted();

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
@ -105,21 +106,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
private boolean assignSeqIds;
private boolean useSecure;
private Token<?> userToken;
private String bulkToken;
private UserProvider userProvider;
//package private for testing
LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
this.cfg = conf;
this.hbAdmin = new HBaseAdmin(conf);
//added simple for testing
this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
}
public LoadIncrementalHFiles(Configuration conf) throws Exception {
this(conf, null);
this.userProvider = UserProvider.instantiate(conf);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
}
@ -254,11 +249,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
//If using secure bulk load
//prepare staging directory and token
if(useSecure) {
if (userProvider.isHBaseSecurityEnabled()) {
FileSystem fs = FileSystem.get(cfg);
//This condition is here for unit testing
//Since delegation token doesn't work in mini cluster
if(User.isSecurityEnabled()) {
if (userProvider.isHadoopSecurityEnabled()) {
userToken = fs.getDelegationToken("renewer");
}
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
@ -292,7 +287,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
} finally {
if(useSecure) {
if (userProvider.isHBaseSecurityEnabled()) {
if(userToken != null) {
try {
userToken.cancel(cfg);
@ -560,7 +555,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
if(!useSecure) {
if(!userProvider.isHBaseSecurityEnabled()) {
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
} else {
HTable table = new HTable(conn.getConfiguration(), getTableName());

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.mapreduce.hadoopbackport.JarFinder;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -293,16 +294,17 @@ public class TableMapReduceUtil {
}
public static void initCredentials(Job job) throws IOException {
if (User.isHBaseSecurityEnabled(job.getConfiguration())) {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
if (userProvider.isHBaseSecurityEnabled()) {
try {
// init credentials for remote cluster
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
if (quorumAddress != null) {
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
User.getCurrent().obtainAuthTokenForJob(peerConf, job);
userProvider.getCurrent().obtainAuthTokenForJob(peerConf, job);
}
User.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
userProvider.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
} catch (InterruptedException ie) {
LOG.info("Interrupted obtaining user authentication token");
Thread.interrupted();

View File

@ -203,6 +203,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
@ -449,7 +450,8 @@ MasterServices, Server {
"hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
// initialize server principal (if using secure Hadoop)
User.login(conf, "hbase.master.keytab.file",
UserProvider provider = UserProvider.instantiate(conf);
provider.login("hbase.master.keytab.file",
"hbase.master.kerberos.principal", this.isa.getHostName());
LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +

View File

@ -198,6 +198,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
@ -489,6 +490,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// Table level lock manager for locking for region operations
private TableLockManager tableLockManager;
private UserProvider userProvider;
/**
* Starts a HRegionServer at the default location
*
@ -502,6 +505,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.conf = conf;
this.isOnline = false;
checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf);
// do we use checksum verification in the hbase? If hbase checksum verification
// is enabled, then we automatically switch off hdfs checksum verification.
@ -589,7 +593,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
"hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
// login the server principal (if using secure Hadoop)
User.login(this.conf, "hbase.regionserver.keytab.file",
userProvider.login("hbase.regionserver.keytab.file",
"hbase.regionserver.kerberos.principal", this.isa.getHostName());
regionServerAccounting = new RegionServerAccounting();
cacheConfig = new CacheConfig(conf);
@ -1884,8 +1888,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
new InetSocketAddress(sn.getHostname(), sn.getPort());
try {
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
User.getCurrent(), this.rpcTimeout);
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
intf = RegionServerStatusService.newBlockingStub(channel);
break;
} catch (IOException e) {

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.rest.filter.AuthFilter;
import org.apache.hadoop.hbase.rest.filter.GzipFilter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
@ -87,9 +88,9 @@ public class RESTServer implements Constants {
FilterHolder authFilter = null;
Configuration conf = HBaseConfiguration.create();
Class<? extends ServletContainer> containerClass = ServletContainer.class;
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) {
String machineName = Strings.domainNamePointerToHostName(
DNS.getDefaultHost(conf.get(REST_DNS_INTERFACE, "default"),
conf.get(REST_DNS_NAMESERVER, "default")));
@ -99,7 +100,7 @@ public class RESTServer implements Constants {
String principalConfig = conf.get(REST_KERBEROS_PRINCIPAL);
Preconditions.checkArgument(principalConfig != null && !principalConfig.isEmpty(),
REST_KERBEROS_PRINCIPAL + " should be set if security is enabled");
User.login(conf, REST_KEYTAB_FILE, REST_KERBEROS_PRINCIPAL, machineName);
userProvider.login(REST_KEYTAB_FILE, REST_KERBEROS_PRINCIPAL, machineName);
if (conf.get(REST_AUTHENTICATION_TYPE) != null) {
containerClass = RESTServletContainer.class;
authFilter = new FilterHolder();
@ -108,7 +109,7 @@ public class RESTServer implements Constants {
}
}
UserGroupInformation realUser = User.getCurrent().getUGI();
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
RESTServlet servlet = RESTServlet.getInstance(conf, realUser);
Options options = new Options();

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Threads;
@ -66,6 +67,7 @@ public class RESTServlet implements Constants {
// A chore to clean up idle connections.
private final Chore connectionCleaner;
private final Stoppable stoppable;
private UserProvider userProvider;
class ConnectionInfo {
final HConnection connection;
@ -167,6 +169,7 @@ public class RESTServlet implements Constants {
*/
RESTServlet(final Configuration conf,
final UserGroupInformation realUser) {
this.userProvider = UserProvider.instantiate(conf);
stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@ -242,7 +245,7 @@ public class RESTServlet implements Constants {
if (!userName.equals(NULL_USERNAME)) {
ugi = UserGroupInformation.createProxyUser(userName, realUser);
}
User user = User.create(ugi);
User user = userProvider.create(ugi);
HConnection conn = HConnectionManager.createConnection(conf, user);
connInfo = new ConnectionInfo(conn, userName);
connections.put(userName, connInfo);

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -139,9 +140,10 @@ public class AccessController extends BaseRegionObserver
private Map<InternalScanner,String> scannerOwners =
new MapMaker().weakKeys().makeMap();
private UserProvider userProvider;
void initialize(RegionCoprocessorEnvironment e) throws IOException {
final HRegion region = e.getRegion();
Map<byte[], ListMultimap<String,TablePermission>> tables =
AccessControlLists.loadAll(region);
// For each table, write out the table's permissions to the respective
@ -319,7 +321,7 @@ public class AccessController extends BaseRegionObserver
User user = RequestContext.getRequestUser();
if (!RequestContext.isInRequestContext()) {
// for non-rpc handling, fallback to system user
user = User.getCurrent();
user = userProvider.getCurrent();
}
return user;
}
@ -518,6 +520,9 @@ public class AccessController extends BaseRegionObserver
zk = regionEnv.getRegionServerServices().getZooKeeper();
}
// set the user-provider.
this.userProvider = UserProvider.instantiate(env.getConfiguration());
// If zk is null or IOException while obtaining auth manager,
// throw RuntimeException so that the coprocessor is unloaded.
if (zk != null) {
@ -1439,7 +1444,7 @@ public class AccessController extends BaseRegionObserver
}
private void isSystemOrSuperUser(Configuration conf) throws IOException {
User user = User.getCurrent();
User user = userProvider.getCurrent();
if (user == null) {
throw new IOException("Unable to obtain the current user, " +
"authorization checks for internal operations will not work correctly!");

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBul
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
@ -114,6 +115,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
private RegionCoprocessorEnvironment env;
private UserProvider userProvider;
@Override
public void start(CoprocessorEnvironment env) {
@ -121,6 +123,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
random = new SecureRandom();
conf = env.getConfiguration();
baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
this.userProvider = UserProvider.instantiate(conf);
try {
fs = FileSystem.get(conf);
@ -196,7 +199,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
final UserGroupInformation ugi = user.getUGI();
if(userToken != null) {
ugi.addToken(userToken);
} else if(User.isSecurityEnabled()) {
} else if (userProvider.isHadoopSecurityEnabled()) {
//we allow this to pass through in "simple" security mode
//for mini cluster testing
ResponseConverter.setControllerException(controller,

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -122,7 +123,8 @@ public class TableAuthManager {
* from the {@code hbase.superuser} configuration key.
*/
private PermissionCache<Permission> initGlobal(Configuration conf) throws IOException {
User user = User.getCurrent();
UserProvider userProvider = UserProvider.instantiate(conf);
User user = userProvider.getCurrent();
if (user == null) {
throw new IOException("Unable to obtain the current user, " +
"authorization checks for internal operations will not work correctly!");

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
@ -1452,7 +1453,8 @@ public class HBaseFsck extends Configured implements Tool {
Path hbaseDir = FSUtils.getRootDir(getConf());
FileSystem fs = hbaseDir.getFileSystem(getConf());
UserGroupInformation ugi = User.getCurrent().getUGI();
UserProvider userProvider = UserProvider.instantiate(getConf());
UserGroupInformation ugi = userProvider.getCurrent().getUGI();
FileStatus[] files = fs.listStatus(hbaseDir);
for (FileStatus file : files) {
try {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.security.UserProvider;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -33,7 +34,7 @@ public class TestCallRunner {
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
mockCall.connection = Mockito.mock(RpcServer.Connection.class);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
CallRunner cr = new CallRunner(mockRpcServer, mockCall, new UserProvider());
cr.run();
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.security.UserProvider;
/**
* A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
* configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
* to do the authentication, which requires a Kerberos ticket (which we currently don't have in
* tests).
* <p>
* This should only be used for <b>TESTING</b>.
*/
public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
@Override
public boolean isHBaseSecurityEnabled() {
return false;
}
@Override
public boolean isHadoopSecurityEnabled() {
return true;
}
}

View File

@ -72,8 +72,6 @@ public class TestLoadIncrementalHFiles {
Compression.Algorithm.NONE;
static HBaseTestingUtility util = new HBaseTestingUtility();
//used by secure subclass
static boolean useSecure = false;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -158,7 +156,7 @@ public class TestLoadIncrementalHFiles {
familyDesc.setBloomFilterType(bloomType);
htd.addFamily(familyDesc);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration(), useSecure);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
String [] args= {dir.toString(),"mytable_"+testName};
loader.run(args);
HTable table = new HTable(util.getConfiguration(), TABLE);
@ -203,7 +201,7 @@ public class TestLoadIncrementalHFiles {
HTable table = new HTable(util.getConfiguration(), TABLE);
util.waitTableEnabled(TABLE);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration(), false);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
try {
loader.doBulkLoad(dir, table);
assertTrue("Loading into table with non-existent family should have failed", false);

View File

@ -142,7 +142,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
*/
private void populateTable(String table, int value) throws Exception {
// create HFiles for different column families
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration(), useSecure);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
Path bulk1 = buildBulkFiles(table, value);
HTable t = new HTable(util.getConfiguration(), Bytes.toBytes(table));
lih.doBulkLoad(bulk1, t);
@ -237,7 +237,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
final AtomicInteger attmptedCalls = new AtomicInteger();
final AtomicInteger failedCalls = new AtomicInteger();
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration(), useSecure) {
util.getConfiguration()) {
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
@ -306,7 +306,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
// files to fail when attempt to atomically import. This is recoverable.
final AtomicInteger attemptedCalls = new AtomicInteger();
LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(
util.getConfiguration(), useSecure) {
util.getConfiguration()) {
protected void bulkLoadPhase(final HTable htable, final HConnection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
@ -347,7 +347,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
final AtomicInteger countedLqis= new AtomicInteger();
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration(), useSecure) {
util.getConfiguration()) {
protected List<LoadQueueItem> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final HTable htable,
@ -379,7 +379,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
setupTable(table, 10);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration(), useSecure) {
util.getConfiguration()) {
int i = 0;
protected List<LoadQueueItem> groupOrSplit(

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@ -42,7 +43,9 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
@BeforeClass
public static void setUpBeforeClass() throws Exception {
useSecure = true;
// set the always on security provider
UserProvider.setUserProviderForTesting(util.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(util.getConfiguration());

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@ -46,8 +47,10 @@ public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrem
//make sure they are in sync
@BeforeClass
public static void setupCluster() throws Exception {
useSecure = true;
util = new HBaseTestingUtility();
// set the always on security provider
UserProvider.setUserProviderForTesting(util.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(util.getConfiguration());

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Strings;
@ -92,13 +93,15 @@ public class ThriftServer {
void doMain(final String[] args) throws Exception {
processOptions(args);
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
String machineName = Strings.domainNamePointerToHostName(
DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) {
String machineName =
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
User.login(conf, "hbase.thrift.keytab.file",
"hbase.thrift.kerberos.principal", machineName);
userProvider
.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", machineName);
}
serverRunner = new ThriftServerRunner(conf);