From 6c4863f355db7c46b50680f9560876d00e902fba Mon Sep 17 00:00:00 2001 From: Gary Helmling Date: Sat, 6 Dec 2014 02:10:31 -0800 Subject: [PATCH] HBASE-12493 Make User and TokenUtil public --- .../hbase/security/token/TokenUtil.java | 374 ++++++++++++++++++ .../apache/hadoop/hbase/security/User.java | 37 +- .../hbase/mapred/TableMapReduceUtil.java | 30 +- .../hbase/mapreduce/TableMapReduceUtil.java | 51 +-- .../hbase/security/token/TokenUtil.java | 200 ---------- .../token/TestTokenAuthentication.java | 37 +- 6 files changed, 468 insertions(+), 261 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java new file mode 100644 index 00000000000..3c71215ac4b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security.token; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; + +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.zookeeper.KeeperException; + +/** + * Utility methods for obtaining authentication tokens. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class TokenUtil { + // This class is referenced indirectly by User out in common; instances are created by reflection + private static Log LOG = LogFactory.getLog(TokenUtil.class); + + /** + * Obtain and return an authentication token for the current user. + * @param conf the configuration for connecting to the cluster + * @return the authentication token instance + * @deprecated Replaced by {@link #obtainToken(Connection)} + */ + @Deprecated + public static Token obtainToken( + Configuration conf) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(conf)) { + return obtainToken(connection); + } + } + + /** + * Obtain and return an authentication token for the current user. + * @param conn The HBase cluster connection + * @return the authentication token instance + */ + public static Token obtainToken( + Connection conn) throws IOException { + Table meta = null; + try { + meta = conn.getTable(TableName.META_TABLE_NAME); + CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); + AuthenticationProtos.AuthenticationService.BlockingInterface service = + AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); + AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, + AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); + + return ProtobufUtil.toToken(response.getToken()); + } catch (ServiceException se) { + ProtobufUtil.toIOException(se); + } finally { + if (meta != null) { + meta.close(); + } + } + // dummy return for ServiceException block + return null; + } + + /** + * Obtain and return an authentication token for the current user. + * @param conn The HBase cluster connection + * @return the authentication token instance + */ + public static Token obtainToken( + final Connection conn, User user) throws IOException, InterruptedException { + return user.runAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return obtainToken(conn); + } + }); + } + + + private static Text getClusterId(Token token) + throws IOException { + return token.getService() != null + ? token.getService() : new Text("default"); + } + + /** + * Obtain an authentication token for the given user and add it to the + * user's credentials. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)} + */ + @Deprecated + public static void obtainAndCacheToken(final Configuration conf, + UserGroupInformation user) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(conf); + try { + UserProvider userProvider = UserProvider.instantiate(conf); + obtainAndCacheToken(conn, userProvider.create(user)); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token for the given user and add it to the + * user's credentials. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainAndCacheToken(final Connection conn, + User user) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName()); + } + user.addToken(token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user " + user.getName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conf The configuration for connecting to the cluster + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)} + */ + @Deprecated + public static void obtainTokenForJob(final Configuration conf, + UserGroupInformation user, Job job) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(conf); + try { + UserProvider userProvider = UserProvider.instantiate(conf); + obtainTokenForJob(conn, userProvider.create(user), job); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final Connection conn, + User user, Job job) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + Text clusterId = getClusterId(token); + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName() + " on cluster " + clusterId.toString()); + } + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user " + user.getName()); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)} + */ + @Deprecated + public static void obtainTokenForJob(final JobConf job, + UserGroupInformation user) + throws IOException, InterruptedException { + Connection conn = ConnectionFactory.createConnection(job); + try { + UserProvider userProvider = UserProvider.instantiate(job); + obtainTokenForJob(conn, job, userProvider.create(user)); + } finally { + conn.close(); + } + } + + /** + * Obtain an authentication token on behalf of the given user and add it to + * the credentials for the given map reduce job. + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) + throws IOException, InterruptedException { + try { + Token token = obtainToken(conn, user); + + if (token == null) { + throw new IOException("No token returned for user " + user.getName()); + } + Text clusterId = getClusterId(token); + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained token " + token.getKind().toString() + " for user " + + user.getName() + " on cluster " + clusterId.toString()); + } + job.getCredentials().addToken(clusterId, token); + } catch (IOException ioe) { + throw ioe; + } catch (InterruptedException ie) { + throw ie; + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected exception obtaining token for user "+user.getName()); + } + } + + /** + * Checks for an authentication token for the given user, obtaining a new token if necessary, + * and adds it to the credentials for the given map reduce job. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job configuration in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void addTokenForJob(final Connection conn, final JobConf job, User user) + throws IOException, InterruptedException { + + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + } + job.getCredentials().addToken(token.getService(), token); + } + + /** + * Checks for an authentication token for the given user, obtaining a new token if necessary, + * and adds it to the credentials for the given map reduce job. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @param job The job instance in which the token should be stored + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + */ + public static void addTokenForJob(final Connection conn, User user, Job job) + throws IOException, InterruptedException { + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + } + job.getCredentials().addToken(token.getService(), token); + } + + /** + * Checks if an authentication tokens exists for the connected cluster, + * obtaining one if needed and adding it to the user's credentials. + * + * @param conn The HBase cluster connection + * @param user The user for whom to obtain the token + * @throws IOException If making a remote call to the authentication service fails + * @throws InterruptedException If executing as the given user is interrupted + * @return true if the token was added, false if it already existed + */ + public static boolean addTokenIfMissing(Connection conn, User user) + throws IOException, InterruptedException { + Token token = getAuthToken(conn.getConfiguration(), user); + if (token == null) { + token = obtainToken(conn, user); + user.getUGI().addToken(token.getService(), token); + return true; + } + return false; + } + + /** + * Get the authentication token of the user for the cluster specified in the configuration + * @return null if the user does not have the token, otherwise the auth token for the cluster. + */ + private static Token getAuthToken(Configuration conf, User user) + throws IOException, InterruptedException { + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null); + try { + String clusterId = ZKClusterId.readClusterIdZNode(zkw); + if (clusterId == null) { + throw new IOException("Failed to get cluster ID"); + } + return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); + } catch (KeeperException e) { + throw new IOException(e); + } finally { + zkw.close(); + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java index 901c57c3e26..a5ac51a78dc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java @@ -23,15 +23,18 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.Collection; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; /** * Wrapper to abstract out usage of user and group information in HBase. @@ -44,7 +47,8 @@ import org.apache.hadoop.security.token.Token; * HBase, but can be extended as needs change. *

*/ -@InterfaceAudience.Private +@InterfaceAudience.Public +@InterfaceStability.Stable public abstract class User { public static final String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication"; @@ -58,6 +62,7 @@ public abstract class User { /** * Returns the full user name. For Kerberos principals this will include * the host and realm portions of the principal name. + * * @return User full name. */ public String getName() { @@ -76,6 +81,7 @@ public abstract class User { /** * Returns the shortened version of the user name -- the portion that maps * to an operating system user name. + * * @return Short name */ public abstract String getShortName(); @@ -96,7 +102,10 @@ public abstract class User { * user's credentials. * * @throws IOException + * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,User,Job)} + * instead. */ + @Deprecated public abstract void obtainAuthTokenForJob(Configuration conf, Job job) throws IOException, InterruptedException; @@ -105,7 +114,10 @@ public abstract class User { * user's credentials. * * @throws IOException + * @deprecated Use {@code TokenUtil.obtainAuthTokenForJob(Connection,JobConf,User)} + * instead. */ + @Deprecated public abstract void obtainAuthTokenForJob(JobConf job) throws IOException, InterruptedException; @@ -118,16 +130,31 @@ public abstract class User { * @return the token of the specified kind. */ public Token getToken(String kind, String service) throws IOException { - for (Token token: ugi.getTokens()) { + for (Token token : ugi.getTokens()) { if (token.getKind().toString().equals(kind) && - (service != null && token.getService().toString().equals(service))) - { + (service != null && token.getService().toString().equals(service))) { return token; } } return null; } + /** + * Returns all the tokens stored in the user's credentials. + */ + public Collection> getTokens() { + return ugi.getTokens(); + } + + /** + * Adds the given Token to the user's credentials. + * + * @param token the token to add + */ + public void addToken(Token token) { + ugi.addToken(token); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java index 1afb9d6f9cd..b5fefbb7413 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.MutationSerialization; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.Text; @@ -236,39 +239,20 @@ public class TableMapReduceUtil { } if (userProvider.isHBaseSecurityEnabled()) { + Connection conn = ConnectionFactory.createConnection(job); try { // login the server principal (if using secure Hadoop) User user = userProvider.getCurrent(); - Token authToken = getAuthToken(job, user); - if (authToken == null) { - user.obtainAuthTokenForJob(job); - } else { - job.getCredentials().addToken(authToken.getService(), authToken); - } + TokenUtil.addTokenForJob(conn, job, user); } catch (InterruptedException ie) { ie.printStackTrace(); Thread.currentThread().interrupt(); + } finally { + conn.close(); } } } - /** - * Get the authentication token of the user for the cluster specified in the configuration - * @return null if the user does not have the token, otherwise the auth token for the cluster. - */ - private static Token getAuthToken(Configuration conf, User user) - throws IOException, InterruptedException { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null); - try { - String clusterId = ZKClusterId.readClusterIdZNode(zkw); - return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens()); - } catch (KeeperException e) { - throw new IOException(e); - } finally { - zkw.close(); - } - } - /** * Ensures that the given number of reduce tasks for the given job * configuration does not exceed the number of regions for the given table. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index b5149415a5e..ab99e1adf2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -452,10 +455,20 @@ public class TableMapReduceUtil { if (quorumAddress != null) { Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - obtainAuthTokenForJob(job, peerConf, user); + Connection peerConn = ConnectionFactory.createConnection(peerConf); + try { + TokenUtil.addTokenForJob(peerConn, user, job); + } finally { + peerConn.close(); + } } - obtainAuthTokenForJob(job, job.getConfiguration(), user); + Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); + try { + TokenUtil.addTokenForJob(conn, user, job); + } finally { + conn.close(); + } } catch (InterruptedException ie) { LOG.info("Interrupted obtaining user authentication token"); Thread.currentThread().interrupt(); @@ -481,7 +494,12 @@ public class TableMapReduceUtil { try { Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent()); + Connection peerConn = ConnectionFactory.createConnection(peerConf); + try { + TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); + } finally { + peerConn.close(); + } } catch (InterruptedException e) { LOG.info("Interrupted obtaining user authentication token"); Thread.interrupted(); @@ -489,33 +507,6 @@ public class TableMapReduceUtil { } } - private static void obtainAuthTokenForJob(Job job, Configuration conf, User user) - throws IOException, InterruptedException { - Token authToken = getAuthToken(conf, user); - if (authToken == null) { - user.obtainAuthTokenForJob(conf, job); - } else { - job.getCredentials().addToken(authToken.getService(), authToken); - } - } - - /** - * Get the authentication token of the user for the cluster specified in the configuration - * @return null if the user does not have the token, otherwise the auth token for the cluster. - */ - private static Token getAuthToken(Configuration conf, User user) - throws IOException, InterruptedException { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null); - try { - String clusterId = ZKClusterId.readClusterIdZNode(zkw); - return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens()); - } catch (KeeperException e) { - throw new IOException(e); - } finally { - zkw.close(); - } - } - /** * Writes the given scan into a Base64 encoded string. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java deleted file mode 100644 index e3c4f5326e4..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.security.token; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedExceptionAction; - -import com.google.protobuf.ServiceException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; - -/** - * Utility methods for obtaining authentication tokens. - */ -@InterfaceAudience.Private -public class TokenUtil { - // This class is referenced indirectly by User out in common; instances are created by reflection - private static Log LOG = LogFactory.getLog(TokenUtil.class); - - /** - * Obtain and return an authentication token for the current user. - * @param conf The configuration for connecting to the cluster - * @return the authentication token instance - */ - public static Token obtainToken( - Configuration conf) throws IOException { - // TODO: Pass in a Connection to used. Will this even work? - try (Connection connection = ConnectionFactory.createConnection(conf)) { - try (Table meta = connection.getTable(TableName.META_TABLE_NAME)) { - CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); - AuthenticationProtos.AuthenticationService.BlockingInterface service = - AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); - AuthenticationProtos.GetAuthenticationTokenResponse response = - service.getAuthenticationToken(null, - AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); - - return ProtobufUtil.toToken(response.getToken()); - } catch (ServiceException se) { - ProtobufUtil.toIOException(se); - } - } - // dummy return for ServiceException catch block - return null; - } - - private static Text getClusterId(Token token) - throws IOException { - return token.getService() != null - ? token.getService() : new Text("default"); - } - - /** - * Obtain an authentication token for the given user and add it to the - * user's credentials. - * @param conf The configuration for connecting to the cluster - * @param user The user for whom to obtain the token - * @throws IOException If making a remote call to the {@link TokenProvider} fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainAndCacheToken(final Configuration conf, - UserGroupInformation user) - throws IOException, InterruptedException { - try { - Token token = - user.doAs(new PrivilegedExceptionAction>() { - public Token run() throws Exception { - return obtainToken(conf); - } - }); - - if (token == null) { - throw new IOException("No token returned for user "+user.getUserName()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained token "+token.getKind().toString()+" for user "+ - user.getUserName()); - } - user.addToken(token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user "+user.getUserName()); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param conf The configuration for connecting to the cluster - * @param user The user for whom to obtain the token - * @param job The job instance in which the token should be stored - * @throws IOException If making a remote call to the {@link TokenProvider} fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainTokenForJob(final Configuration conf, - UserGroupInformation user, Job job) - throws IOException, InterruptedException { - try { - Token token = - user.doAs(new PrivilegedExceptionAction>() { - public Token run() throws Exception { - return obtainToken(conf); - } - }); - - if (token == null) { - throw new IOException("No token returned for user "+user.getUserName()); - } - Text clusterId = getClusterId(token); - LOG.info("Obtained token "+token.getKind().toString()+" for user "+ - user.getUserName() + " on cluster "+clusterId.toString()); - job.getCredentials().addToken(clusterId, token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user "+user.getUserName()); - } - } - - /** - * Obtain an authentication token on behalf of the given user and add it to - * the credentials for the given map reduce job. - * @param user The user for whom to obtain the token - * @param job The job configuration in which the token should be stored - * @throws IOException If making a remote call to the {@link TokenProvider} fails - * @throws InterruptedException If executing as the given user is interrupted - */ - public static void obtainTokenForJob(final JobConf job, - UserGroupInformation user) - throws IOException, InterruptedException { - try { - Token token = - user.doAs(new PrivilegedExceptionAction>() { - public Token run() throws Exception { - return obtainToken(job); - } - }); - - if (token == null) { - throw new IOException("No token returned for user "+user.getUserName()); - } - Text clusterId = getClusterId(token); - LOG.info("Obtained token "+token.getKind().toString()+" for user "+ - user.getUserName()+" on cluster "+clusterId.toString()); - job.getCredentials().addToken(clusterId, token); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw ie; - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected exception obtaining token for user "+user.getUserName()); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 970ab4874e5..73156600db1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.security.token; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -43,6 +45,8 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; @@ -402,11 +406,11 @@ public class TestTokenAuthentication { System.currentTimeMillis()); try { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, - User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT); AuthenticationProtos.AuthenticationService.BlockingInterface stub = - AuthenticationProtos.AuthenticationService.newBlockingStub(channel); + AuthenticationProtos.AuthenticationService.newBlockingStub(channel); AuthenticationProtos.WhoAmIResponse response = - stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); + stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance()); String myname = response.getUsername(); assertEquals("testuser", myname); String authMethod = response.getAuthMethod(); @@ -418,4 +422,31 @@ public class TestTokenAuthentication { } }); } + + @Test + public void testUseExistingToken() throws Exception { + User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2", + new String[]{"testgroup"}); + Token token = + secretManager.generateToken(user.getName()); + assertNotNull(token); + user.addToken(token); + + // make sure we got a token + Token firstToken = + new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); + assertNotNull(firstToken); + assertEquals(token, firstToken); + + Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + try { + assertFalse(TokenUtil.addTokenIfMissing(conn, user)); + // make sure we still have the same token + Token secondToken = + new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens()); + assertEquals(firstToken, secondToken); + } finally { + conn.close(); + } + } }