MAPREDUCE-2716. svn merge -c r1164805 --ignore-ancestry ../../trunk/

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1164807 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-03 06:46:45 +00:00
parent 326cdada1f
commit cb8d102f97
14 changed files with 172 additions and 51 deletions

View File

@ -1188,6 +1188,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2756. Better error handling in JobControl for failed jobs.
(Robert Evans via acmurthy)
MAPREDUCE-2716. MRReliabilityTest job fails because of missing
job-file. (Jeffrey Naisbitt via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -599,6 +599,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
report.setCleanupProgress(cleanupProgress);
report.setMapProgress(computeProgress(mapTasks));
report.setReduceProgress(computeProgress(reduceTasks));
report.setJobName(jobName);
report.setUser(username);
return report;
} finally {

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.JobStatus.State;
@ -39,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -279,13 +281,13 @@ public class TypeConverter {
public static org.apache.hadoop.mapred.JobStatus fromYarn(
JobReport jobreport, String jobFile, String trackingUrl) {
String user = null, jobName = null;
JobPriority jobPriority = JobPriority.NORMAL;
return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()),
jobreport.getSetupProgress(), jobreport.getMapProgress(),
jobreport.getReduceProgress(), jobreport.getCleanupProgress(),
fromYarn(jobreport.getJobState()),
jobPriority, user, jobName, jobFile, trackingUrl);
jobPriority, jobreport.getUser(), jobreport.getJobName(),
jobFile, trackingUrl);
}
public static int fromYarn(JobState state) {
@ -395,45 +397,51 @@ public class TypeConverter {
return taskTrackers.toArray(new TaskTrackerInfo[nodes.size()]);
}
public static JobStatus fromYarn(ApplicationReport application) {
public static JobStatus fromYarn(ApplicationReport application,
String jobFile) {
String trackingUrl = application.getTrackingUrl();
trackingUrl = trackingUrl == null ? "" : trackingUrl;
JobStatus jobStatus =
JobStatus jobStatus =
new JobStatus(
TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getState()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(),
application.getQueue(), "", trackingUrl
);
TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getState()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
return jobStatus;
}
public static JobStatus[] fromYarnApps(List<ApplicationReport> applications) {
public static JobStatus[] fromYarnApps(List<ApplicationReport> applications,
Configuration conf) {
List<JobStatus> jobStatuses = new ArrayList<JobStatus>();
for (ApplicationReport application : applications) {
jobStatuses.add(TypeConverter.fromYarn(application));
// each applicationReport has its own jobFile
org.apache.hadoop.mapreduce.JobID jobId =
TypeConverter.fromYarn(application.getApplicationId());
jobStatuses.add(TypeConverter.fromYarn(application,
MRApps.getJobFile(conf, application.getUser(), jobId)));
}
return jobStatuses.toArray(new JobStatus[jobStatuses.size()]);
}
public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo
queueInfo) {
queueInfo, Configuration conf) {
return new QueueInfo(queueInfo.getQueueName(),
queueInfo.toString(), QueueState.RUNNING,
TypeConverter.fromYarnApps(queueInfo.getApplications()));
TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
}
public static QueueInfo[] fromYarnQueueInfo(
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues) {
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
Configuration conf) {
List<QueueInfo> queueInfos = new ArrayList<QueueInfo>(queues.size());
for (org.apache.hadoop.yarn.api.records.QueueInfo queue : queues) {
queueInfos.add(TypeConverter.fromYarn(queue));
queueInfos.add(TypeConverter.fromYarn(queue, conf));
}
return queueInfos.toArray(new QueueInfo[queueInfos.size()]);
}

View File

@ -27,6 +27,8 @@ public interface JobReport {
public abstract float getSetupProgress();
public abstract long getStartTime();
public abstract long getFinishTime();
public abstract String getUser();
public abstract String getJobName();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@ -36,4 +38,6 @@ public interface JobReport {
public abstract void setSetupProgress(float progress);
public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime);
public abstract void setUser(String user);
public abstract void setJobName(String jobName);
}

View File

@ -182,6 +182,30 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
builder.setFinishTime((finishTime));
}
@Override
public String getUser() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getUser());
}
@Override
public void setUser(String user) {
maybeInitBuilder();
builder.setUser((user));
}
@Override
public String getJobName() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getJobName());
}
@Override
public void setJobName(String jobName) {
maybeInitBuilder();
builder.setJobName((jobName));
}
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}

View File

@ -218,7 +218,14 @@ public class MRApps extends Apps {
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(
conf.get(MRConstants.APPS_STAGING_DIR_KEY) +
conf.get(MRConstants.APPS_STAGING_DIR_KEY) +
Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
}
public static String getJobFile(Configuration conf, String user,
org.apache.hadoop.mapreduce.JobID jobId) {
Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
return jobFile.toString();
}
}

View File

@ -139,6 +139,8 @@ message JobReportProto {
optional float setup_progress = 6;
optional int64 start_time = 7;
optional int64 finish_time = 8;
optional string user = 9;
optional string jobName = 10;
}
enum TaskAttemptCompletionEventStatusProto {

View File

@ -21,8 +21,11 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Test;
public class TestTypeConverter {
@ -35,8 +38,33 @@ public class TestTypeConverter {
applicationReport.setApplicationId(applicationId);
applicationReport.setState(state);
applicationReport.setStartTime(appStartTime);
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport);
applicationReport.setUser("TestTypeConverter-user");
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
}
@Test
public void testFromYarnApplicationReport() {
ApplicationId mockAppId = mock(ApplicationId.class);
when(mockAppId.getClusterTimestamp()).thenReturn(12345L);
when(mockAppId.getId()).thenReturn(6789);
ApplicationReport mockReport = mock(ApplicationReport.class);
when(mockReport.getTrackingUrl()).thenReturn("dummy-tracking-url");
when(mockReport.getApplicationId()).thenReturn(mockAppId);
when(mockReport.getState()).thenReturn(ApplicationState.KILLED);
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
String jobFile = "dummy-path/job.xml";
JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
Assert.assertNotNull("fromYarn returned null status", status);
Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile());
Assert.assertEquals("queue set incorrectly", "dummy-queue", status.getQueue());
Assert.assertEquals("trackingUrl set incorrectly", "dummy-tracking-url", status.getTrackingUrl());
Assert.assertEquals("user set incorrectly", "dummy-user", status.getUsername());
Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo());
Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
}
}

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.mapreduce.v2.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -107,4 +110,14 @@ public class TestMRApps {
@Test(expected=YarnException.class) public void testTaskAttemptIDShort() {
MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
}
@Test public void testGetJobFileWithUser() {
Configuration conf = new Configuration();
conf.set(MRConstants.APPS_STAGING_DIR_KEY, "/my/path/to/staging");
String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345));
assertNotNull("getJobFile results in null.", jobFile);
assertEquals("jobFile with specified user is not as expected.",
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
}
}

View File

@ -90,7 +90,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setStartTime(jobInfo.getLaunchTime());
report.setFinishTime(jobInfo.getFinishTime());
//TOODO Possibly populate job progress. Never used.
report.setJobName(jobInfo.getJobname());
report.setUser(jobInfo.getUsername());
//TODO Possibly populate job progress. Never used.
//report.setMapProgress(progress)
//report.setReduceProgress(progress)
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
@ -61,24 +62,20 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private static final NotRunningJob NOTSTARTEDJOB =
new NotRunningJob(JobState.NEW);
private static final NotRunningJob FAILEDJOB =
new NotRunningJob(JobState.FAILED);
private static final NotRunningJob KILLEDJOB =
new NotRunningJob(JobState.KILLED);
// Caches for per-user NotRunningJobs
private static HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs =
new HashMap<JobState, HashMap<String, NotRunningJob>>();
private final Configuration conf;
private final JobID jobId;
@ -101,6 +98,24 @@ class ClientServiceDelegate {
this.appId = TypeConverter.toYarn(jobId).getAppId();
}
// Get the instance of the NotRunningJob corresponding to the specified
// user and state
private NotRunningJob getNotRunningJob(String user, JobState state) {
synchronized (notRunningJobs) {
HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
if (map == null) {
map = new HashMap<String, NotRunningJob>();
notRunningJobs.put(state, map);
}
NotRunningJob notRunningJob = map.get(user);
if (notRunningJob == null) {
notRunningJob = new NotRunningJob(user, state);
map.put(user, notRunningJob);
}
return notRunningJob;
}
}
private MRClientProtocol getProxy() throws YarnRemoteException {
if (!forceRefresh && realProxy != null) {
return realProxy;
@ -149,26 +164,30 @@ class ClientServiceDelegate {
}
}
/** we just want to return if its allocating, so that we dont
/** we just want to return if its allocating, so that we don't
* block on it. This is to be able to return job status
* on a allocating Application.
* on an allocating Application.
*/
String user = application.getUser();
if (user == null) {
throw new YarnRemoteExceptionPBImpl("User is not set in the application report");
}
if (application.getState() == ApplicationState.NEW ||
application.getState() == ApplicationState.SUBMITTED) {
realProxy = null;
return NOTSTARTEDJOB;
return getNotRunningJob(user, JobState.NEW);
}
if (application.getState() == ApplicationState.FAILED) {
realProxy = null;
return FAILEDJOB;
return getNotRunningJob(user, JobState.FAILED);
}
if (application.getState() == ApplicationState.KILLED) {
realProxy = null;
return KILLEDJOB;
}
realProxy = null;
return getNotRunningJob(user, JobState.KILLED);
}
//History server can serve a job only if application
//succeeded.
@ -270,17 +289,15 @@ class ClientServiceDelegate {
return result;
}
JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
YarnRemoteException {
JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
String stagingDir = conf.get("yarn.apps.stagingDir");
String jobFile = stagingDir + "/" + jobId.toString();
MRClientProtocol protocol;
GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
//TODO: add tracking url in JobReport
return TypeConverter.fromYarn(report, jobFile, "");
}

View File

@ -63,8 +63,10 @@ public class NotRunningJob implements MRClientProtocol {
RecordFactoryProvider.getRecordFactory(null);
private final JobState jobState;
private final String user;
NotRunningJob(JobState jobState) {
NotRunningJob(String username, JobState jobState) {
this.user = username;
this.jobState = jobState;
}
@ -104,7 +106,10 @@ public class NotRunningJob implements MRClientProtocol {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(jobState);
jobReport.setJobState(this.jobState);
jobReport.setUser(this.user);
// TODO: Add jobName & other job information that is available
resp.setJobReport(jobReport);
return resp;
}

View File

@ -120,7 +120,7 @@ public class ResourceMgrDelegate {
recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
GetAllApplicationsResponse response =
applicationsManager.getAllApplications(request);
return TypeConverter.fromYarnApps(response.getApplicationList());
return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
}
@ -182,7 +182,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(queueName, true, false, false);
recordFactory.newRecordInstance(GetQueueInfoRequest.class);
return TypeConverter.fromYarn(
applicationsManager.getQueueInfo(request).getQueueInfo());
applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
}
private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent,
@ -216,7 +216,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
getChildQueues(rootQueue, queues);
return TypeConverter.fromYarnQueueInfo(queues);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
@ -229,7 +229,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo();
getChildQueues(rootQueue, queues);
return TypeConverter.fromYarnQueueInfo(queues);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
public QueueInfo[] getChildQueues(String parent) throws IOException,
@ -242,7 +242,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
getChildQueues(parentQueue, queues);
return TypeConverter.fromYarnQueueInfo(queues);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
public String getStagingAreaDir() throws IOException, InterruptedException {

View File

@ -268,6 +268,7 @@ public class TestClientRedirect {
String[] split = AMHOSTADDRESS.split(":");
application.setHost(split[0]);
application.setRpcPort(Integer.parseInt(split[1]));
application.setUser("TestClientRedirect-user");
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
response.setApplicationReport(application);
@ -397,6 +398,11 @@ public class TestClientRedirect {
JobReport jobReport = recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(JobState.RUNNING);
jobReport.setJobName("TestClientRedirect-jobname");
jobReport.setUser("TestClientRedirect-user");
jobReport.setStartTime(0L);
jobReport.setFinishTime(1L);
GetJobReportResponse response = recordFactory
.newRecordInstance(GetJobReportResponse.class);
response.setJobReport(jobReport);