From cb8d102f9748febf764c512d5116cb73fd6c113b Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 3 Sep 2011 06:46:45 +0000 Subject: [PATCH] MAPREDUCE-2716. svn merge -c r1164805 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1164807 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/JobImpl.java | 2 + .../hadoop/mapreduce/TypeConverter.java | 44 +++++++------ .../mapreduce/v2/api/records/JobReport.java | 4 ++ .../api/records/impl/pb/JobReportPBImpl.java | 24 +++++++ .../hadoop/mapreduce/v2/util/MRApps.java | 9 ++- .../src/main/proto/mr_protos.proto | 2 + .../hadoop/mapreduce/TestTypeConverter.java | 30 ++++++++- .../hadoop/mapreduce/v2/util/TestMRApps.java | 13 ++++ .../hadoop/mapreduce/v2/hs/CompletedJob.java | 4 +- .../hadoop/mapred/ClientServiceDelegate.java | 63 ++++++++++++------- .../apache/hadoop/mapred/NotRunningJob.java | 9 ++- .../hadoop/mapred/ResourceMgrDelegate.java | 10 +-- .../hadoop/mapred/TestClientRedirect.java | 6 ++ 14 files changed, 172 insertions(+), 51 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b9bf700516a..dc0eccb2be4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1188,6 +1188,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2756. Better error handling in JobControl for failed jobs. (Robert Evans via acmurthy) + MAPREDUCE-2716. MRReliabilityTest job fails because of missing + job-file. (Jeffrey Naisbitt via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 302fd937518..c2a397502f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -599,6 +599,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, report.setCleanupProgress(cleanupProgress); report.setMapProgress(computeProgress(mapTasks)); report.setReduceProgress(computeProgress(reduceTasks)); + report.setJobName(jobName); + report.setUser(username); return report; } finally { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java index bb9d9d131df..a678e4660e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.JobStatus.State; @@ -39,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -279,13 +281,13 @@ public class TypeConverter { public static org.apache.hadoop.mapred.JobStatus fromYarn( JobReport jobreport, String jobFile, String trackingUrl) { - String user = null, jobName = null; JobPriority jobPriority = JobPriority.NORMAL; return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport.getMapProgress(), jobreport.getReduceProgress(), jobreport.getCleanupProgress(), fromYarn(jobreport.getJobState()), - jobPriority, user, jobName, jobFile, trackingUrl); + jobPriority, jobreport.getUser(), jobreport.getJobName(), + jobFile, trackingUrl); } public static int fromYarn(JobState state) { @@ -395,45 +397,51 @@ public class TypeConverter { return taskTrackers.toArray(new TaskTrackerInfo[nodes.size()]); } - public static JobStatus fromYarn(ApplicationReport application) { + public static JobStatus fromYarn(ApplicationReport application, + String jobFile) { String trackingUrl = application.getTrackingUrl(); trackingUrl = trackingUrl == null ? "" : trackingUrl; - - JobStatus jobStatus = + JobStatus jobStatus = new JobStatus( - TypeConverter.fromYarn(application.getApplicationId()), - 0.0f, 0.0f, 0.0f, 0.0f, - TypeConverter.fromYarn(application.getState()), - org.apache.hadoop.mapreduce.JobPriority.NORMAL, - application.getUser(), application.getName(), - application.getQueue(), "", trackingUrl - ); + TypeConverter.fromYarn(application.getApplicationId()), + 0.0f, 0.0f, 0.0f, 0.0f, + TypeConverter.fromYarn(application.getState()), + org.apache.hadoop.mapreduce.JobPriority.NORMAL, + application.getUser(), application.getName(), + application.getQueue(), jobFile, trackingUrl + ); jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url jobStatus.setStartTime(application.getStartTime()); return jobStatus; } - public static JobStatus[] fromYarnApps(List applications) { + public static JobStatus[] fromYarnApps(List applications, + Configuration conf) { List jobStatuses = new ArrayList(); for (ApplicationReport application : applications) { - jobStatuses.add(TypeConverter.fromYarn(application)); + // each applicationReport has its own jobFile + org.apache.hadoop.mapreduce.JobID jobId = + TypeConverter.fromYarn(application.getApplicationId()); + jobStatuses.add(TypeConverter.fromYarn(application, + MRApps.getJobFile(conf, application.getUser(), jobId))); } return jobStatuses.toArray(new JobStatus[jobStatuses.size()]); } public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo - queueInfo) { + queueInfo, Configuration conf) { return new QueueInfo(queueInfo.getQueueName(), queueInfo.toString(), QueueState.RUNNING, - TypeConverter.fromYarnApps(queueInfo.getApplications())); + TypeConverter.fromYarnApps(queueInfo.getApplications(), conf)); } public static QueueInfo[] fromYarnQueueInfo( - List queues) { + List queues, + Configuration conf) { List queueInfos = new ArrayList(queues.size()); for (org.apache.hadoop.yarn.api.records.QueueInfo queue : queues) { - queueInfos.add(TypeConverter.fromYarn(queue)); + queueInfos.add(TypeConverter.fromYarn(queue, conf)); } return queueInfos.toArray(new QueueInfo[queueInfos.size()]); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java index 72b3a66cefa..fb585e8dd27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java @@ -27,6 +27,8 @@ public interface JobReport { public abstract float getSetupProgress(); public abstract long getStartTime(); public abstract long getFinishTime(); + public abstract String getUser(); + public abstract String getJobName(); public abstract void setJobId(JobId jobId); public abstract void setJobState(JobState jobState); @@ -36,4 +38,6 @@ public interface JobReport { public abstract void setSetupProgress(float progress); public abstract void setStartTime(long startTime); public abstract void setFinishTime(long finishTime); + public abstract void setUser(String user); + public abstract void setJobName(String jobName); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java index 5e4c2e5b275..a4033e695f2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java @@ -182,6 +182,30 @@ public class JobReportPBImpl extends ProtoBase implements JobRep builder.setFinishTime((finishTime)); } + @Override + public String getUser() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getUser()); + } + + @Override + public void setUser(String user) { + maybeInitBuilder(); + builder.setUser((user)); + } + + @Override + public String getJobName() { + JobReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.getJobName()); + } + + @Override + public void setJobName(String jobName) { + maybeInitBuilder(); + builder.setJobName((jobName)); + } + private JobIdPBImpl convertFromProtoFormat(JobIdProto p) { return new JobIdPBImpl(p); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 988a502458e..a6a5eddaf4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -218,7 +218,14 @@ public class MRApps extends Apps { private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path( - conf.get(MRConstants.APPS_STAGING_DIR_KEY) + + conf.get(MRConstants.APPS_STAGING_DIR_KEY) + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT); } + + public static String getJobFile(Configuration conf, String user, + org.apache.hadoop.mapreduce.JobID jobId) { + Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user), + jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE); + return jobFile.toString(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto index 7383f54ea3d..046d30d8ac6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto @@ -139,6 +139,8 @@ message JobReportProto { optional float setup_progress = 6; optional int64 start_time = 7; optional int64 finish_time = 8; + optional string user = 9; + optional string jobName = 10; } enum TaskAttemptCompletionEventStatusProto { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java index 16c3e0d1b21..bda7fb9d658 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java @@ -21,8 +21,11 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.junit.Test; public class TestTypeConverter { @@ -35,8 +38,33 @@ public class TestTypeConverter { applicationReport.setApplicationId(applicationId); applicationReport.setState(state); applicationReport.setStartTime(appStartTime); - JobStatus jobStatus = TypeConverter.fromYarn(applicationReport); + applicationReport.setUser("TestTypeConverter-user"); + JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile"); Assert.assertEquals(appStartTime, jobStatus.getStartTime()); Assert.assertEquals(state.toString(), jobStatus.getState().toString()); } + + @Test + public void testFromYarnApplicationReport() { + ApplicationId mockAppId = mock(ApplicationId.class); + when(mockAppId.getClusterTimestamp()).thenReturn(12345L); + when(mockAppId.getId()).thenReturn(6789); + + ApplicationReport mockReport = mock(ApplicationReport.class); + when(mockReport.getTrackingUrl()).thenReturn("dummy-tracking-url"); + when(mockReport.getApplicationId()).thenReturn(mockAppId); + when(mockReport.getState()).thenReturn(ApplicationState.KILLED); + when(mockReport.getUser()).thenReturn("dummy-user"); + when(mockReport.getQueue()).thenReturn("dummy-queue"); + String jobFile = "dummy-path/job.xml"; + JobStatus status = TypeConverter.fromYarn(mockReport, jobFile); + Assert.assertNotNull("fromYarn returned null status", status); + Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile()); + Assert.assertEquals("queue set incorrectly", "dummy-queue", status.getQueue()); + Assert.assertEquals("trackingUrl set incorrectly", "dummy-tracking-url", status.getTrackingUrl()); + Assert.assertEquals("user set incorrectly", "dummy-user", status.getUsername()); + Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo()); + Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId()); + Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 2c9a701ed0b..77fa446d58c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -18,10 +18,13 @@ package org.apache.hadoop.mapreduce.v2.util; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.MRConstants; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,4 +110,14 @@ public class TestMRApps { @Test(expected=YarnException.class) public void testTaskAttemptIDShort() { MRApps.toTaskAttemptID("attempt_0_0_0_m_0"); } + + @Test public void testGetJobFileWithUser() { + Configuration conf = new Configuration(); + conf.set(MRConstants.APPS_STAGING_DIR_KEY, "/my/path/to/staging"); + String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345)); + assertNotNull("getJobFile results in null.", jobFile); + assertEquals("jobFile with specified user is not as expected.", + "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index 0f1b08547bd..d29139f2df2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -90,7 +90,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job report.setJobState(JobState.valueOf(jobInfo.getJobStatus())); report.setStartTime(jobInfo.getLaunchTime()); report.setFinishTime(jobInfo.getFinishTime()); - //TOODO Possibly populate job progress. Never used. + report.setJobName(jobInfo.getJobname()); + report.setUser(jobInfo.getUsername()); + //TODO Possibly populate job progress. Never used. //report.setMapProgress(progress) //report.setReduceProgress(progress) } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 707217356ec..4e865a39a4a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; import java.lang.reflect.Method; import java.security.PrivilegedAction; +import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; @@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig; +import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; @@ -61,24 +62,20 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; -import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; class ClientServiceDelegate { private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); - private static final NotRunningJob NOTSTARTEDJOB = - new NotRunningJob(JobState.NEW); - - private static final NotRunningJob FAILEDJOB = - new NotRunningJob(JobState.FAILED); - - private static final NotRunningJob KILLEDJOB = - new NotRunningJob(JobState.KILLED); + + // Caches for per-user NotRunningJobs + private static HashMap> notRunningJobs = + new HashMap>(); private final Configuration conf; private final JobID jobId; @@ -101,6 +98,24 @@ class ClientServiceDelegate { this.appId = TypeConverter.toYarn(jobId).getAppId(); } + // Get the instance of the NotRunningJob corresponding to the specified + // user and state + private NotRunningJob getNotRunningJob(String user, JobState state) { + synchronized (notRunningJobs) { + HashMap map = notRunningJobs.get(state); + if (map == null) { + map = new HashMap(); + notRunningJobs.put(state, map); + } + NotRunningJob notRunningJob = map.get(user); + if (notRunningJob == null) { + notRunningJob = new NotRunningJob(user, state); + map.put(user, notRunningJob); + } + return notRunningJob; + } + } + private MRClientProtocol getProxy() throws YarnRemoteException { if (!forceRefresh && realProxy != null) { return realProxy; @@ -149,26 +164,30 @@ class ClientServiceDelegate { } } - /** we just want to return if its allocating, so that we dont + /** we just want to return if its allocating, so that we don't * block on it. This is to be able to return job status - * on a allocating Application. + * on an allocating Application. */ + String user = application.getUser(); + if (user == null) { + throw new YarnRemoteExceptionPBImpl("User is not set in the application report"); + } if (application.getState() == ApplicationState.NEW || application.getState() == ApplicationState.SUBMITTED) { realProxy = null; - return NOTSTARTEDJOB; + return getNotRunningJob(user, JobState.NEW); } if (application.getState() == ApplicationState.FAILED) { realProxy = null; - return FAILEDJOB; + return getNotRunningJob(user, JobState.FAILED); } if (application.getState() == ApplicationState.KILLED) { - realProxy = null; - return KILLEDJOB; - } + realProxy = null; + return getNotRunningJob(user, JobState.KILLED); + } //History server can serve a job only if application //succeeded. @@ -270,17 +289,15 @@ class ClientServiceDelegate { return result; } - JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException, - YarnRemoteException { + JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); - String stagingDir = conf.get("yarn.apps.stagingDir"); - String jobFile = stagingDir + "/" + jobId.toString(); - MRClientProtocol protocol; GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class); request.setJobId(jobId); - JobReport report = ((GetJobReportResponse) invoke("getJobReport", + JobReport report = ((GetJobReportResponse) invoke("getJobReport", GetJobReportRequest.class, request)).getJobReport(); + String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); + //TODO: add tracking url in JobReport return TypeConverter.fromYarn(report, jobFile, ""); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java index 3fa01eb5f4b..a40fcedda39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java @@ -63,8 +63,10 @@ public class NotRunningJob implements MRClientProtocol { RecordFactoryProvider.getRecordFactory(null); private final JobState jobState; + private final String user; - NotRunningJob(JobState jobState) { + NotRunningJob(String username, JobState jobState) { + this.user = username; this.jobState = jobState; } @@ -104,7 +106,10 @@ public class NotRunningJob implements MRClientProtocol { JobReport jobReport = recordFactory.newRecordInstance(JobReport.class); jobReport.setJobId(request.getJobId()); - jobReport.setJobState(jobState); + jobReport.setJobState(this.jobState); + + jobReport.setUser(this.user); + // TODO: Add jobName & other job information that is available resp.setJobReport(jobReport); return resp; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 0459009aec0..ac606c03305 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -120,7 +120,7 @@ public class ResourceMgrDelegate { recordFactory.newRecordInstance(GetAllApplicationsRequest.class); GetAllApplicationsResponse response = applicationsManager.getAllApplications(request); - return TypeConverter.fromYarnApps(response.getApplicationList()); + return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf); } @@ -182,7 +182,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(queueName, true, false, false); recordFactory.newRecordInstance(GetQueueInfoRequest.class); return TypeConverter.fromYarn( - applicationsManager.getQueueInfo(request).getQueueInfo()); + applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf); } private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, @@ -216,7 +216,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); getChildQueues(rootQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } @@ -229,7 +229,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo(); getChildQueues(rootQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } public QueueInfo[] getChildQueues(String parent) throws IOException, @@ -242,7 +242,7 @@ public class ResourceMgrDelegate { getQueueInfoRequest(parent, false, true, false)).getQueueInfo(); getChildQueues(parentQueue, queues); - return TypeConverter.fromYarnQueueInfo(queues); + return TypeConverter.fromYarnQueueInfo(queues, this.conf); } public String getStagingAreaDir() throws IOException, InterruptedException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 8c97ccc8554..24df9be59ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -268,6 +268,7 @@ public class TestClientRedirect { String[] split = AMHOSTADDRESS.split(":"); application.setHost(split[0]); application.setRpcPort(Integer.parseInt(split[1])); + application.setUser("TestClientRedirect-user"); GetApplicationReportResponse response = recordFactory .newRecordInstance(GetApplicationReportResponse.class); response.setApplicationReport(application); @@ -397,6 +398,11 @@ public class TestClientRedirect { JobReport jobReport = recordFactory.newRecordInstance(JobReport.class); jobReport.setJobId(request.getJobId()); jobReport.setJobState(JobState.RUNNING); + jobReport.setJobName("TestClientRedirect-jobname"); + jobReport.setUser("TestClientRedirect-user"); + jobReport.setStartTime(0L); + jobReport.setFinishTime(1L); + GetJobReportResponse response = recordFactory .newRecordInstance(GetJobReportResponse.class); response.setJobReport(jobReport);