Merge -r 1177530:1177531 from trunk to branch-0.23 to fix MAPREDUCE-2996.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1177532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f99d1f7fde
commit
be51b71dee
|
@ -1449,6 +1449,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2791. Added missing info on 'job -status' output. (Devaraj K via
|
||||
acmurthy)
|
||||
|
||||
MAPREDUCE-2996. Add uber-ness information to JobHistory. (Jonathan Eagles
|
||||
via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1135,7 +1135,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
new JobInitedEvent(job.oldJobId,
|
||||
job.startTime,
|
||||
job.numMapTasks, job.numReduceTasks,
|
||||
job.getState().toString()); //Will transition to state running. Currently in INITED
|
||||
job.getState().toString(),
|
||||
job.isUber()); //Will transition to state running. Currently in INITED
|
||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
|
||||
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
|
||||
job.submitTime, job.startTime);
|
||||
|
|
|
@ -64,7 +64,8 @@
|
|||
{"name": "launchTime", "type": "long"},
|
||||
{"name": "totalMaps", "type": "int"},
|
||||
{"name": "totalReduces", "type": "int"},
|
||||
{"name": "jobStatus", "type": "string"}
|
||||
{"name": "jobStatus", "type": "string"},
|
||||
{"name": "uberized", "type": "boolean"}
|
||||
]
|
||||
},
|
||||
|
||||
|
|
|
@ -302,6 +302,7 @@ public class JobHistoryParser {
|
|||
info.launchTime = event.getLaunchTime();
|
||||
info.totalMaps = event.getTotalMaps();
|
||||
info.totalReduces = event.getTotalReduces();
|
||||
info.uberized = event.getUberized();
|
||||
}
|
||||
|
||||
private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
|
||||
|
@ -346,6 +347,7 @@ public class JobHistoryParser {
|
|||
Map<JobACL, AccessControlList> jobACLs;
|
||||
|
||||
Map<TaskID, TaskInfo> tasksMap;
|
||||
boolean uberized;
|
||||
|
||||
/** Create a job info object where job information will be stored
|
||||
* after a parse
|
||||
|
@ -373,7 +375,8 @@ public class JobHistoryParser {
|
|||
System.out.println("MAP_COUNTERS:" + mapCounters.toString());
|
||||
System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
|
||||
System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
|
||||
|
||||
System.out.println("UBERIZED: " + uberized);
|
||||
|
||||
for (TaskInfo ti: tasksMap.values()) {
|
||||
ti.printAll();
|
||||
}
|
||||
|
@ -421,6 +424,8 @@ public class JobHistoryParser {
|
|||
/** @return the priority of this job */
|
||||
public String getPriority() { return priority.toString(); }
|
||||
public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
|
||||
/** @return the uberized status of this job */
|
||||
public boolean getUberized() { return uberized; }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,14 +42,16 @@ public class JobInitedEvent implements HistoryEvent {
|
|||
* @param totalMaps
|
||||
* @param totalReduces
|
||||
* @param jobStatus
|
||||
* @param uberized True if the job's map and reduce stages were combined
|
||||
*/
|
||||
public JobInitedEvent(JobID id, long launchTime, int totalMaps,
|
||||
int totalReduces, String jobStatus) {
|
||||
int totalReduces, String jobStatus, boolean uberized) {
|
||||
datum.jobid = new Utf8(id.toString());
|
||||
datum.launchTime = launchTime;
|
||||
datum.totalMaps = totalMaps;
|
||||
datum.totalReduces = totalReduces;
|
||||
datum.jobStatus = new Utf8(jobStatus);
|
||||
datum.uberized = uberized;
|
||||
}
|
||||
|
||||
JobInitedEvent() { }
|
||||
|
@ -67,9 +69,10 @@ public class JobInitedEvent implements HistoryEvent {
|
|||
public int getTotalReduces() { return datum.totalReduces; }
|
||||
/** Get the status */
|
||||
public String getStatus() { return datum.jobStatus.toString(); }
|
||||
/** Get the event type */
|
||||
/** Get the event type */
|
||||
public EventType getEventType() {
|
||||
return EventType.JOB_INITED;
|
||||
}
|
||||
|
||||
/** Get whether the job's map and reduce stages were combined */
|
||||
public boolean getUberized() { return datum.uberized; }
|
||||
}
|
||||
|
|
|
@ -290,8 +290,7 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
|
|||
|
||||
@Override
|
||||
public boolean isUber() {
|
||||
LOG.warn("isUber is not yet implemented");
|
||||
return false;
|
||||
return jobInfo.getUberized();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -95,6 +95,8 @@ public class TestJobHistoryParsing {
|
|||
2, jobInfo.getFinishedMaps());
|
||||
Assert.assertEquals("incorrect finishedReduces ",
|
||||
1, jobInfo.getFinishedReduces());
|
||||
Assert.assertEquals("incorrect uberized ",
|
||||
job.isUber(), jobInfo.getUberized());
|
||||
int totalTasks = jobInfo.getAllTasks().size();
|
||||
Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks);
|
||||
|
||||
|
|
|
@ -697,7 +697,8 @@ public class JobInProgress {
|
|||
JobInitedEvent jie = new JobInitedEvent(
|
||||
profile.getJobID(), this.launchTime,
|
||||
numMapTasks, numReduceTasks,
|
||||
JobStatus.getJobRunState(JobStatus.PREP));
|
||||
JobStatus.getJobRunState(JobStatus.PREP),
|
||||
false);
|
||||
|
||||
jobHistory.logEvent(jie, jobId);
|
||||
|
||||
|
|
|
@ -125,10 +125,12 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
|
|||
String status = line.get("JOB_STATUS");
|
||||
String totalMaps = line.get("TOTAL_MAPS");
|
||||
String totalReduces = line.get("TOTAL_REDUCES");
|
||||
String uberized = line.get("UBERIZED");
|
||||
|
||||
if (launchTime != null && totalMaps != null && totalReduces != null) {
|
||||
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
|
||||
.parseInt(totalMaps), Integer.parseInt(totalReduces), status);
|
||||
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
|
||||
Boolean.parseBoolean(uberized));
|
||||
}
|
||||
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue