MAPREDUCE-3443. JobClient and Job should function in the context of the UGI which created them. (Contributed by Mahadev Konar)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1209231 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2011-12-01 20:39:37 +00:00
parent 6a9624dcec
commit 102541b12b
4 changed files with 219 additions and 74 deletions

View File

@ -211,6 +211,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3452. fifoscheduler web ui page always shows 0% used for the queue.
(Jonathan Eagles via mahadev)
MAPREDUCE-3443. JobClient and Job should function in the context of the
UGI which created them. (Mahadev Konar via sseth)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -421,6 +423,11 @@ public class JobClient extends CLI {
}
Cluster cluster;
/**
* Ugi of the client. We store this ugi when the client is created and
* then make sure that the same ugi is used to run the various protocols.
*/
UserGroupInformation clientUgi;
/**
* Create a job client.
@ -458,6 +465,7 @@ public class JobClient extends CLI {
public void init(JobConf conf) throws IOException {
setConf(conf);
cluster = new Cluster(conf);
clientUgi = UserGroupInformation.getCurrentUser();
}
@InterfaceAudience.Private
@ -487,8 +495,7 @@ public class JobClient extends CLI {
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
}
}
/**
@ -500,6 +507,7 @@ public class JobClient extends CLI {
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
cluster = new Cluster(jobTrackAddr, conf);
clientUgi = UserGroupInformation.getCurrentUser();
}
/**
@ -562,21 +570,34 @@ public class JobClient extends CLI {
* @throws FileNotFoundException
* @throws IOException
*/
public RunningJob submitJob(JobConf conf) throws FileNotFoundException,
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
IOException {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = Job.getInstance(conf);
job.submit();
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(conf);
job.submit();
return job;
}
});
return new NetworkedJob(job);
} catch (InterruptedException ie) {
throw new IOException("interrupted", ie);
} catch (ClassNotFoundException cnfe) {
throw new IOException("class not found", cnfe);
}
}
private Job getJobUsingCluster(final JobID jobid) throws IOException,
InterruptedException {
return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException, InterruptedException {
return cluster.getJob(jobid);
}
});
}
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns
* null if the id does not correspond to any known job.
@ -586,9 +607,10 @@ public class JobClient extends CLI {
* <code>jobid</code> doesn't correspond to any known job.
* @throws IOException
*/
public RunningJob getJob(JobID jobid) throws IOException {
public RunningJob getJob(final JobID jobid) throws IOException {
try {
Job job = cluster.getJob(jobid);
Job job = getJobUsingCluster(jobid);
if (job != null) {
JobStatus status = JobStatus.downgrade(job.getStatus());
if (status != null) {
@ -621,9 +643,10 @@ public class JobClient extends CLI {
return getTaskReports(jobId, TaskType.MAP);
}
private TaskReport[] getTaskReports(JobID jobId, TaskType type) throws IOException {
private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
IOException {
try {
Job j = cluster.getJob(jobId);
Job j = getJobUsingCluster(jobId);
if(j == null) {
return EMPTY_TASK_REPORTS;
}
@ -688,10 +711,11 @@ public class JobClient extends CLI {
* @param state the state of the task
* (pending/running/completed/failed/killed)
*/
public void displayTasks(JobID jobId, String type, String state)
public void displayTasks(final JobID jobId, String type, String state)
throws IOException {
try {
super.displayTasks(cluster.getJob(jobId), type, state);
Job job = getJobUsingCluster(jobId);
super.displayTasks(job, type, state);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -706,15 +730,20 @@ public class JobClient extends CLI {
*/
public ClusterStatus getClusterStatus() throws IOException {
try {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(metrics.getTaskTrackerCount(),
metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus(),
metrics.getDecommissionedTaskTrackerCount());
} catch (InterruptedException ie) {
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(metrics.getTaskTrackerCount(),
metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus(),
metrics.getDecommissionedTaskTrackerCount());
}
});
}
catch (InterruptedException ie) {
throw new IOException(ie);
}
}
@ -750,13 +779,17 @@ public class JobClient extends CLI {
*/
public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
try {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus());
return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
public ClusterStatus run() throws IOException, InterruptedException {
ClusterMetrics metrics = cluster.getClusterStatus();
return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
metrics.getReduceSlotCapacity(),
cluster.getJobTrackerStatus());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -787,7 +820,14 @@ public class JobClient extends CLI {
*/
public JobStatus[] getAllJobs() throws IOException {
try {
org.apache.hadoop.mapreduce.JobStatus[] jobs = cluster.getAllJobStatuses();
org.apache.hadoop.mapreduce.JobStatus[] jobs =
clientUgi.doAs(new PrivilegedExceptionAction<
org.apache.hadoop.mapreduce.JobStatus[]> () {
public org.apache.hadoop.mapreduce.JobStatus[] run()
throws IOException, InterruptedException {
return cluster.getAllJobStatuses();
}
});
JobStatus[] stats = new JobStatus[jobs.length];
for (int i = 0; i < jobs.length; i++) {
stats[i] = JobStatus.downgrade(jobs[i]);
@ -909,7 +949,12 @@ public class JobClient extends CLI {
*/
public int getDefaultMaps() throws IOException {
try {
return cluster.getClusterStatus().getMapSlotCapacity();
return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IOException, InterruptedException {
return cluster.getClusterStatus().getMapSlotCapacity();
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -923,7 +968,12 @@ public class JobClient extends CLI {
*/
public int getDefaultReduces() throws IOException {
try {
return cluster.getClusterStatus().getReduceSlotCapacity();
return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
@Override
public Integer run() throws IOException, InterruptedException {
return cluster.getClusterStatus().getReduceSlotCapacity();
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -936,8 +986,13 @@ public class JobClient extends CLI {
*/
public Path getSystemDir() {
try {
return cluster.getSystemDir();
} catch (IOException ioe) {
return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
@Override
public Path run() throws IOException, InterruptedException {
return cluster.getSystemDir();
}
});
} catch (IOException ioe) {
return null;
} catch (InterruptedException ie) {
return null;
@ -962,7 +1017,11 @@ public class JobClient extends CLI {
*/
public JobQueueInfo[] getRootQueues() throws IOException {
try {
return getJobQueueInfoArray(cluster.getRootQueues());
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getRootQueues());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -976,9 +1035,13 @@ public class JobClient extends CLI {
* @return the array of immediate children JobQueueInfo objects
* @throws IOException
*/
public JobQueueInfo[] getChildQueues(String queueName) throws IOException {
public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
try {
return getJobQueueInfoArray(cluster.getChildQueues(queueName));
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getChildQueues(queueName));
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -993,7 +1056,11 @@ public class JobClient extends CLI {
*/
public JobQueueInfo[] getQueues() throws IOException {
try {
return getJobQueueInfoArray(cluster.getQueues());
return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
public JobQueueInfo[] run() throws IOException, InterruptedException {
return getJobQueueInfoArray(cluster.getQueues());
}
});
} catch (InterruptedException ie) {
throw new IOException(ie);
}
@ -1007,9 +1074,14 @@ public class JobClient extends CLI {
* @throws IOException
*/
public JobStatus[] getJobsFromQueue(String queueName) throws IOException {
public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
try {
QueueInfo queue = cluster.getQueue(queueName);
QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
@Override
public QueueInfo run() throws IOException, InterruptedException {
return cluster.getQueue(queueName);
}
});
if (queue == null) {
return null;
}
@ -1032,9 +1104,14 @@ public class JobClient extends CLI {
* @return Queue information associated to particular queue.
* @throws IOException
*/
public JobQueueInfo getQueueInfo(String queueName) throws IOException {
public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
try {
QueueInfo queueInfo = cluster.getQueue(queueName);
QueueInfo queueInfo = clientUgi.doAs(new
PrivilegedExceptionAction<QueueInfo>() {
public QueueInfo run() throws IOException, InterruptedException {
return cluster.getQueue(queueName);
}
});
if (queueInfo != null) {
return new JobQueueInfo(queueInfo);
}
@ -1052,7 +1129,14 @@ public class JobClient extends CLI {
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
try {
org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
cluster.getQueueAclsForCurrentUser();
clientUgi.doAs(new
PrivilegedExceptionAction
<org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
throws IOException, InterruptedException {
return cluster.getQueueAclsForCurrentUser();
}
});
QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
for (int i = 0 ; i < acls.length; i++ ) {
ret[i] = QueueAclsInfo.downgrade(acls[i]);
@ -1070,8 +1154,14 @@ public class JobClient extends CLI {
* @throws IOException
*/
public Token<DelegationTokenIdentifier>
getDelegationToken(Text renewer) throws IOException, InterruptedException {
return cluster.getDelegationToken(renewer);
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,
InterruptedException {
return cluster.getDelegationToken(renewer);
}
});
}
/**

View File

@ -30,6 +30,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -315,7 +316,12 @@ public class Job extends JobContextImpl implements JobContext {
* @throws IOException
*/
synchronized void updateStatus() throws IOException, InterruptedException {
this.status = cluster.getClient().getJobStatus(status.getJobID());
this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
@Override
public JobStatus run() throws IOException, InterruptedException {
return cluster.getClient().getJobStatus(status.getJobID());
}
});
if (this.status == null) {
throw new IOException("Job status not available ");
}
@ -476,8 +482,16 @@ public class Job extends JobContextImpl implements JobContext {
InterruptedException {
int failCount = 1;
TaskCompletionEvent lastEvent = null;
for (TaskCompletionEvent event : cluster.getClient().getTaskCompletionEvents(
status.getJobID(), 0, 10)) {
TaskCompletionEvent[] events = ugi.doAs(new
PrivilegedExceptionAction<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] run() throws IOException,
InterruptedException {
return cluster.getClient().getTaskCompletionEvents(
status.getJobID(), 0, 10);
}
});
for (TaskCompletionEvent event : events) {
if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
failCount++;
lastEvent = event;
@ -500,7 +514,12 @@ public class Job extends JobContextImpl implements JobContext {
public TaskReport[] getTaskReports(TaskType type)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskReports(getJobID(), type);
final TaskType tmpType = type;
return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
public TaskReport[] run() throws IOException, InterruptedException {
return cluster.getClient().getTaskReports(getJobID(), tmpType);
}
});
}
/**
@ -603,7 +622,14 @@ public class Job extends JobContextImpl implements JobContext {
org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
} else {
ensureState(JobState.RUNNING);
cluster.getClient().setJobPriority(getJobID(), priority.toString());
final JobPriority tmpPriority = priority;
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException, InterruptedException {
cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
return null;
}
});
}
}
@ -615,12 +641,17 @@ public class Job extends JobContextImpl implements JobContext {
* @return an array of {@link TaskCompletionEvent}s
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom,
int numEvents) throws IOException, InterruptedException {
public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
final int numEvents) throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskCompletionEvents(getJobID(),
startFrom, numEvents);
}
return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
@Override
public TaskCompletionEvent[] run() throws IOException, InterruptedException {
return cluster.getClient().getTaskCompletionEvents(getJobID(),
startFrom, numEvents);
}
});
}
/**
* Kill indicated task attempt.
@ -628,10 +659,14 @@ public class Job extends JobContextImpl implements JobContext {
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public boolean killTask(TaskAttemptID taskId)
public boolean killTask(final TaskAttemptID taskId)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().killTask(taskId, false);
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, false);
}
});
}
/**
@ -640,10 +675,15 @@ public class Job extends JobContextImpl implements JobContext {
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public boolean failTask(TaskAttemptID taskId)
public boolean failTask(final TaskAttemptID taskId)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().killTask(taskId, true);
return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException, InterruptedException {
return cluster.getClient().killTask(taskId, true);
}
});
}
/**
@ -656,7 +696,12 @@ public class Job extends JobContextImpl implements JobContext {
public Counters getCounters()
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getJobCounters(getJobID());
return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
@Override
public Counters run() throws IOException, InterruptedException {
return cluster.getClient().getJobCounters(getJobID());
}
});
}
/**
@ -665,10 +710,15 @@ public class Job extends JobContextImpl implements JobContext {
* @return the list of diagnostic messages for the task
* @throws IOException
*/
public String[] getTaskDiagnostics(TaskAttemptID taskid)
public String[] getTaskDiagnostics(final TaskAttemptID taskid)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskDiagnostics(taskid);
return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
@Override
public String[] run() throws IOException, InterruptedException {
return cluster.getClient().getTaskDiagnostics(taskid);
}
});
}
/**

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -156,6 +156,8 @@ public class ClientServiceDelegate {
application = rm.getApplicationReport(appId);
continue;
}
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
UserGroupInformation.getCurrentUser().getUserName());
serviceAddr = application.getHost() + ":" + application.getRpcPort();
if (UserGroupInformation.isSecurityEnabled()) {
String clientTokenEncoded = application.getClientToken();
@ -167,11 +169,17 @@ public class ClientServiceDelegate {
.getHost(), application.getRpcPort());
clientToken.setService(new Text(addr.getAddress().getHostAddress()
+ ":" + addr.getPort()));
UserGroupInformation.getCurrentUser().addToken(clientToken);
newUgi.addToken(clientToken);
}
LOG.info("The url to track the job: " + application.getTrackingUrl());
LOG.debug("Connecting to " + serviceAddr);
realProxy = instantiateAMProxy(serviceAddr);
final String tempStr = serviceAddr;
realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() throws IOException {
return instantiateAMProxy(tempStr);
}
});
return realProxy;
} catch (IOException e) {
//possibly the AM has crashed
@ -243,17 +251,11 @@ public class ClientServiceDelegate {
MRClientProtocol instantiateAMProxy(final String serviceAddr)
throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
MRClientProtocol proxy = currentUser
.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
YarnRPC rpc = YarnRPC.create(conf);
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
YarnRPC rpc = YarnRPC.create(conf);
MRClientProtocol proxy =
(MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});
LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
return proxy;
}