Merge -r 1177486:1177487 from trunk to branch-0.23 to fix MAPREDUCE-2791.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1177490 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-30 03:10:57 +00:00
parent 3513e3d673
commit f99d1f7fde
12 changed files with 86 additions and 24 deletions

View File

@ -1446,6 +1446,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3114. Fixed invalid ApplicationURL on RM WebUI. (Subroto Sanyal MAPREDUCE-3114. Fixed invalid ApplicationURL on RM WebUI. (Subroto Sanyal
via vinodkv) via vinodkv)
MAPREDUCE-2791. Added missing info on 'job -status' output. (Devaraj K via
acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -590,12 +590,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (getState() == JobState.NEW) { if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, 0.0f, startTime, finishTime, setupProgress, 0.0f,
0.0f, cleanupProgress); 0.0f, cleanupProgress, remoteJobConfFile.toString());
} }
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, computeProgress(mapTasks), startTime, finishTime, setupProgress, computeProgress(mapTasks),
computeProgress(reduceTasks), cleanupProgress); computeProgress(reduceTasks), cleanupProgress, remoteJobConfFile.toString());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -116,7 +116,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0)); 0, 0, 0, 0, 0, 0, "jobfile"));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -193,7 +193,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0)); 0, 0, 0, 0, 0, 0, "jobfile"));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -259,7 +259,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn( when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
0, 0, 0, 0, 0, 0)); 0, 0, 0, 0, 0, 0, "jobfile"));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob);
@ -373,7 +373,7 @@ public class TestRMContainerAllocator {
public JobReport getReport() { public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user", return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress, JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
this.reduceProgress, this.cleanupProgress); this.reduceProgress, this.cleanupProgress, "jobfile");
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskCompletionEvent;
@ -280,16 +281,14 @@ public class TypeConverter {
return yCntrs; return yCntrs;
} }
public static org.apache.hadoop.mapred.JobStatus fromYarn( public static JobStatus fromYarn(JobReport jobreport, String trackingUrl) {
JobReport jobreport, String jobFile) {
JobPriority jobPriority = JobPriority.NORMAL; JobPriority jobPriority = JobPriority.NORMAL;
org.apache.hadoop.mapred.JobStatus jobStatus = JobStatus jobStatus = new org.apache.hadoop.mapred.JobStatus(
new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.getJobId()), fromYarn(jobreport.getJobId()), jobreport.getSetupProgress(), jobreport
jobreport.getSetupProgress(), jobreport.getMapProgress(), .getMapProgress(), jobreport.getReduceProgress(), jobreport
jobreport.getReduceProgress(), jobreport.getCleanupProgress(), .getCleanupProgress(), fromYarn(jobreport.getJobState()),
fromYarn(jobreport.getJobState()), jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
jobPriority, jobreport.getUser(), jobreport.getJobName(), .getJobFile(), trackingUrl);
jobFile, jobreport.getTrackingUrl());
jobStatus.setFailureInfo(jobreport.getDiagnostics()); jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus; return jobStatus;
} }

View File

@ -31,6 +31,7 @@ public interface JobReport {
public abstract String getJobName(); public abstract String getJobName();
public abstract String getTrackingUrl(); public abstract String getTrackingUrl();
public abstract String getDiagnostics(); public abstract String getDiagnostics();
public abstract String getJobFile();
public abstract void setJobId(JobId jobId); public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState); public abstract void setJobState(JobState jobState);
@ -44,4 +45,5 @@ public interface JobReport {
public abstract void setJobName(String jobName); public abstract void setJobName(String jobName);
public abstract void setTrackingUrl(String trackingUrl); public abstract void setTrackingUrl(String trackingUrl);
public abstract void setDiagnostics(String diagnostics); public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile);
} }

View File

@ -229,7 +229,19 @@ public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobRep
maybeInitBuilder(); maybeInitBuilder();
builder.setDiagnostics(diagnostics); builder.setDiagnostics(diagnostics);
} }
@Override
public String getJobFile() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getJobFile();
}
@Override
public void setJobFile(String jobFile) {
maybeInitBuilder();
builder.setJobFile(jobFile);
}
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) { private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p); return new JobIdPBImpl(p);
} }

View File

@ -55,7 +55,7 @@ public class MRBuilderUtils {
public static JobReport newJobReport(JobId jobId, String jobName, public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long startTime, long finishTime, String userName, JobState state, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress, float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress) { float cleanupProgress, String jobFile) {
JobReport report = Records.newRecord(JobReport.class); JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId); report.setJobId(jobId);
report.setJobName(jobName); report.setJobName(jobName);
@ -67,6 +67,7 @@ public class MRBuilderUtils {
report.setCleanupProgress(cleanupProgress); report.setCleanupProgress(cleanupProgress);
report.setMapProgress(mapProgress); report.setMapProgress(mapProgress);
report.setReduceProgress(reduceProgress); report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile);
return report; return report;
} }
} }

View File

@ -145,6 +145,7 @@ message JobReportProto {
optional string jobName = 10; optional string jobName = 10;
optional string trackingUrl = 11; optional string trackingUrl = 11;
optional string diagnostics = 12; optional string diagnostics = 12;
optional string jobFile = 13;
} }
enum TaskAttemptCompletionEventStatusProto { enum TaskAttemptCompletionEventStatusProto {

View File

@ -462,8 +462,6 @@ public class Job extends JobContextImpl implements JobContext {
sb.append(status.getReduceProgress()).append("\n"); sb.append(status.getReduceProgress()).append("\n");
sb.append("Job state: "); sb.append("Job state: ");
sb.append(status.getState()).append("\n"); sb.append(status.getState()).append("\n");
sb.append("history URL: ");
sb.append(status.getHistoryFile()).append("\n");
sb.append("retired: ").append(status.isRetired()).append("\n"); sb.append("retired: ").append(status.isRetired()).append("\n");
sb.append("reason for failure: ").append(reasonforFailure); sb.append("reason for failure: ").append(reasonforFailure);
return sb.toString(); return sb.toString();

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
@ -96,9 +97,11 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
report.setFinishTime(jobInfo.getFinishTime()); report.setFinishTime(jobInfo.getFinishTime());
report.setJobName(jobInfo.getJobname()); report.setJobName(jobInfo.getJobname());
report.setUser(jobInfo.getUsername()); report.setUser(jobInfo.getUsername());
//TODO Possibly populate job progress. Never used. report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
//report.setMapProgress(progress) report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
//report.setReduceProgress(progress) report.setJobFile(confFile.toString());
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
} }
@Override @Override

View File

@ -25,6 +25,7 @@ import java.security.PrivilegedAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -86,6 +87,7 @@ public class ClientServiceDelegate {
private MRClientProtocol realProxy = null; private MRClientProtocol realProxy = null;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User"; private static String UNKNOWN_USER = "Unknown User";
private String trackingUrl;
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) { JobID jobId, MRClientProtocol historyServerProxy) {
@ -129,6 +131,9 @@ public class ClientServiceDelegate {
// Possibly allow nulls through the PB tunnel, otherwise deal with an exception // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
// and redirect to the history server. // and redirect to the history server.
ApplicationReport application = rm.getApplicationReport(appId); ApplicationReport application = rm.getApplicationReport(appId);
if (application != null) {
trackingUrl = application.getTrackingUrl();
}
String serviceAddr = null; String serviceAddr = null;
while (application == null || ApplicationState.RUNNING.equals(application.getState())) { while (application == null || ApplicationState.RUNNING.equals(application.getState())) {
if (application == null) { if (application == null) {
@ -334,9 +339,14 @@ public class ClientServiceDelegate {
request.setJobId(jobId); request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport", JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport(); GetJobReportRequest.class, request)).getJobReport();
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID); if (StringUtils.isEmpty(report.getJobFile())) {
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
return TypeConverter.fromYarn(report, jobFile); report.setJobFile(jobFile);
}
String historyTrackingUrl = report.getTrackingUrl();
return TypeConverter.fromYarn(report, "http://"
+ (StringUtils.isNotEmpty(historyTrackingUrl) ? historyTrackingUrl
: trackingUrl));
} }
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)

View File

@ -124,6 +124,26 @@ public class TestClientServiceDelegate {
Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState()); Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
} }
@Test
public void testJobReportFromHistoryServer() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
getJobReportResponseFromHistoryServer());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
Assert.assertNotNull(jobStatus);
Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());
Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
}
private GetJobReportRequest getJobReportRequest() { private GetJobReportRequest getJobReportRequest() {
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
request.setJobId(jobId); request.setJobId(jobId);
@ -170,4 +190,17 @@ public class TestClientServiceDelegate {
return clientServiceDelegate; return clientServiceDelegate;
} }
private GetJobReportResponse getJobReportResponseFromHistoryServer() {
GetJobReportResponse jobReportResponse = Records
.newRecord(GetJobReportResponse.class);
JobReport jobReport = Records.newRecord(JobReport.class);
jobReport.setJobId(jobId);
jobReport.setJobState(JobState.SUCCEEDED);
jobReport.setMapProgress(1.0f);
jobReport.setReduceProgress(1.0f);
jobReport.setJobFile("TestJobFilePath");
jobReport.setTrackingUrl("TestTrackingUrl");
jobReportResponse.setJobReport(jobReport);
return jobReportResponse;
}
} }