HBASE-9890 MR jobs are not working if started by a delegated user
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1540242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ecb4b1017c
commit
a12caeebf8
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Methods;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Wrapper to abstract out usage of user and group information in HBase.
|
||||
|
@ -111,6 +112,25 @@ public abstract class User {
|
|||
public abstract void obtainAuthTokenForJob(JobConf job)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Returns the Token of the specified kind associated with this user,
|
||||
* or null if the Token is not present.
|
||||
*
|
||||
* @param kind the kind of token
|
||||
* @param service service on which the token is supposed to be used
|
||||
* @return the token of the specified kind.
|
||||
*/
|
||||
public Token<?> getToken(String kind, String service) throws IOException {
|
||||
for (Token<?> token: ugi.getTokens()) {
|
||||
if (token.getKind().toString().equals(kind) &&
|
||||
(service != null && token.getService().toString().equals(service)))
|
||||
{
|
||||
return token;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapred;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -28,8 +29,13 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
|
||||
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.FileInputFormat;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.InputFormat;
|
||||
|
@ -37,6 +43,8 @@ import org.apache.hadoop.mapred.OutputFormat;
|
|||
import org.apache.hadoop.mapred.TextInputFormat;
|
||||
import org.apache.hadoop.mapred.TextOutputFormat;
|
||||
import org.apache.hadoop.mapred.jobcontrol.Job;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Utility for {@link TableMap} and {@link TableReduce}
|
||||
|
@ -178,10 +186,23 @@ public class TableMapReduceUtil {
|
|||
|
||||
public static void initCredentials(JobConf job) throws IOException {
|
||||
UserProvider userProvider = UserProvider.instantiate(job);
|
||||
// login the server principal (if using secure Hadoop)
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
// propagate delegation related props from launcher job to MR job
|
||||
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
|
||||
job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
|
||||
}
|
||||
}
|
||||
|
||||
if (userProvider.isHBaseSecurityEnabled()) {
|
||||
try {
|
||||
userProvider.getCurrent().obtainAuthTokenForJob(job);
|
||||
// login the server principal (if using secure Hadoop)
|
||||
User user = userProvider.getCurrent();
|
||||
Token<AuthenticationTokenIdentifier> authToken = getAuthToken(job, user);
|
||||
if (authToken == null) {
|
||||
user.obtainAuthTokenForJob(job);
|
||||
} else {
|
||||
job.getCredentials().addToken(authToken.getService(), authToken);
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
ie.printStackTrace();
|
||||
Thread.interrupted();
|
||||
|
@ -189,6 +210,23 @@ public class TableMapReduceUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the authentication token of the user for the cluster specified in the configuration
|
||||
* @return null if the user does not have the token, otherwise the auth token for the cluster.
|
||||
*/
|
||||
private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
|
||||
throws IOException, InterruptedException {
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
|
||||
try {
|
||||
String clusterId = ZKClusterId.readClusterIdZNode(zkw);
|
||||
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures that the given number of reduce tasks for the given job
|
||||
* configuration does not exceed the number of regions for the given table.
|
||||
|
|
|
@ -106,6 +106,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
||||
private boolean assignSeqIds;
|
||||
|
||||
private boolean hasForwardedToken;
|
||||
private Token<?> userToken;
|
||||
private String bulkToken;
|
||||
private UserProvider userProvider;
|
||||
|
@ -254,7 +255,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
//This condition is here for unit testing
|
||||
//Since delegation token doesn't work in mini cluster
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN",
|
||||
fs.getCanonicalServiceName());
|
||||
if (userToken == null) {
|
||||
hasForwardedToken = false;
|
||||
userToken = fs.getDelegationToken("renewer");
|
||||
} else {
|
||||
hasForwardedToken = true;
|
||||
LOG.info("Use the existing token: " + userToken);
|
||||
}
|
||||
}
|
||||
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
|
||||
}
|
||||
|
@ -288,7 +297,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
|
||||
} finally {
|
||||
if (userProvider.isHBaseSecurityEnabled()) {
|
||||
if(userToken != null) {
|
||||
if (userToken != null && !hasForwardedToken) {
|
||||
try {
|
||||
userToken.cancel(cfg);
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -50,14 +50,21 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
|
||||
import org.apache.hadoop.hbase.util.Base64;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapreduce.InputFormat;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
|
@ -295,16 +302,26 @@ public class TableMapReduceUtil {
|
|||
|
||||
public static void initCredentials(Job job) throws IOException {
|
||||
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
|
||||
if (userProvider.isHadoopSecurityEnabled()) {
|
||||
// propagate delegation related props from launcher job to MR job
|
||||
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
|
||||
job.getConfiguration().set("mapreduce.job.credentials.binary",
|
||||
System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
|
||||
}
|
||||
}
|
||||
|
||||
if (userProvider.isHBaseSecurityEnabled()) {
|
||||
try {
|
||||
// init credentials for remote cluster
|
||||
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
|
||||
User user = userProvider.getCurrent();
|
||||
if (quorumAddress != null) {
|
||||
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
|
||||
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
|
||||
userProvider.getCurrent().obtainAuthTokenForJob(peerConf, job);
|
||||
obtainAuthTokenForJob(job, peerConf, user);
|
||||
}
|
||||
userProvider.getCurrent().obtainAuthTokenForJob(job.getConfiguration(), job);
|
||||
|
||||
obtainAuthTokenForJob(job, job.getConfiguration(), user);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interrupted obtaining user authentication token");
|
||||
Thread.interrupted();
|
||||
|
@ -312,6 +329,33 @@ public class TableMapReduceUtil {
|
|||
}
|
||||
}
|
||||
|
||||
private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
|
||||
throws IOException, InterruptedException {
|
||||
Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
|
||||
if (authToken == null) {
|
||||
user.obtainAuthTokenForJob(conf, job);
|
||||
} else {
|
||||
job.getCredentials().addToken(authToken.getService(), authToken);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the authentication token of the user for the cluster specified in the configuration
|
||||
* @return null if the user does not have the token, otherwise the auth token for the cluster.
|
||||
*/
|
||||
private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
|
||||
throws IOException, InterruptedException {
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "mr-init-credentials", null);
|
||||
try {
|
||||
String clusterId = ZKClusterId.readClusterIdZNode(zkw);
|
||||
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getUGI().getTokens());
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the given scan into a Base64 encoded string.
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue