Merge -c 1182613 from trunk to branch-0.23 to fix MAPREDUCE-2666.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1182614 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-12 23:12:37 +00:00
parent a6d54ece55
commit b57ac1cf61
9 changed files with 44 additions and 11 deletions

View File

@ -1554,6 +1554,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed
task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv) task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv)
MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM
restart. (Jonathan Eagles via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1095,6 +1095,8 @@ public abstract class TaskAttemptImpl implements
//set the launch time //set the launch time
taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.launchTime = taskAttempt.clock.getTime();
taskAttempt.shufflePort = event.getShufflePort();
// register it to TaskAttemptListener so that it start listening // register it to TaskAttemptListener so that it start listening
// for it // for it
taskAttempt.taskAttemptListener.register( taskAttempt.taskAttemptListener.register(
@ -1116,7 +1118,7 @@ public abstract class TaskAttemptImpl implements
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime, taskAttempt.launchTime,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort()); nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort);
taskAttempt.eventHandler.handle taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle taskAttempt.eventHandler.handle
@ -1125,7 +1127,6 @@ public abstract class TaskAttemptImpl implements
//make remoteTask reference as null as it is no more needed //make remoteTask reference as null as it is no more needed
//and free up the memory //and free up the memory
taskAttempt.remoteTask = null; taskAttempt.remoteTask = null;
taskAttempt.shufflePort = event.getShufflePort();
//tell the Task that attempt has started //tell the Task that attempt has started
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

View File

@ -173,7 +173,8 @@
{"name": "attemptId", "type": "string"}, {"name": "attemptId", "type": "string"},
{"name": "startTime", "type": "long"}, {"name": "startTime", "type": "long"},
{"name": "trackerName", "type": "string"}, {"name": "trackerName", "type": "string"},
{"name": "httpPort", "type": "int"} {"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"}
] ]
}, },

View File

@ -240,6 +240,7 @@ public class JobHistoryParser {
attemptInfo.httpPort = event.getHttpPort(); attemptInfo.httpPort = event.getHttpPort();
attemptInfo.trackerName = event.getTrackerName(); attemptInfo.trackerName = event.getTrackerName();
attemptInfo.taskType = event.getTaskType(); attemptInfo.taskType = event.getTaskType();
attemptInfo.shufflePort = event.getShufflePort();
taskInfo.attemptsMap.put(attemptId, attemptInfo); taskInfo.attemptsMap.put(attemptId, attemptInfo);
} }
@ -506,6 +507,7 @@ public class JobHistoryParser {
String trackerName; String trackerName;
Counters counters; Counters counters;
int httpPort; int httpPort;
int shufflePort;
String hostname; String hostname;
/** Create a Task Attempt Info which will store attempt level information /** Create a Task Attempt Info which will store attempt level information
@ -516,6 +518,7 @@ public class JobHistoryParser {
mapFinishTime = -1; mapFinishTime = -1;
error = state = trackerName = hostname = ""; error = state = trackerName = hostname = "";
httpPort = -1; httpPort = -1;
shufflePort = -1;
} }
/** /**
* Print all the information about this attempt. * Print all the information about this attempt.
@ -530,6 +533,7 @@ public class JobHistoryParser {
System.out.println("TASK_TYPE:" + taskType); System.out.println("TASK_TYPE:" + taskType);
System.out.println("TRACKER_NAME:" + trackerName); System.out.println("TRACKER_NAME:" + trackerName);
System.out.println("HTTP_PORT:" + httpPort); System.out.println("HTTP_PORT:" + httpPort);
System.out.println("SHUFFLE_PORT:" + shufflePort);
if (counters != null) { if (counters != null) {
System.out.println("COUNTERS:" + counters.toString()); System.out.println("COUNTERS:" + counters.toString());
} }
@ -563,5 +567,7 @@ public class JobHistoryParser {
public Counters getCounters() { return counters; } public Counters getCounters() { return counters; }
/** @return the HTTP port for the tracker */ /** @return the HTTP port for the tracker */
public int getHttpPort() { return httpPort; } public int getHttpPort() { return httpPort; }
/** @return the Shuffle port for the tracker */
public int getShufflePort() { return shufflePort; }
} }
} }

View File

@ -44,16 +44,18 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
* @param startTime Start time of the attempt * @param startTime Start time of the attempt
* @param trackerName Name of the Task Tracker where attempt is running * @param trackerName Name of the Task Tracker where attempt is running
* @param httpPort The port number of the tracker * @param httpPort The port number of the tracker
* @param shufflePort The shuffle port number of the container
*/ */
public TaskAttemptStartedEvent( TaskAttemptID attemptId, public TaskAttemptStartedEvent( TaskAttemptID attemptId,
TaskType taskType, long startTime, String trackerName, TaskType taskType, long startTime, String trackerName,
int httpPort) { int httpPort, int shufflePort) {
datum.attemptId = new Utf8(attemptId.toString()); datum.attemptId = new Utf8(attemptId.toString());
datum.taskid = new Utf8(attemptId.getTaskID().toString()); datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.startTime = startTime; datum.startTime = startTime;
datum.taskType = new Utf8(taskType.name()); datum.taskType = new Utf8(taskType.name());
datum.trackerName = new Utf8(trackerName); datum.trackerName = new Utf8(trackerName);
datum.httpPort = httpPort; datum.httpPort = httpPort;
datum.shufflePort = shufflePort;
} }
TaskAttemptStartedEvent() {} TaskAttemptStartedEvent() {}
@ -75,6 +77,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
} }
/** Get the HTTP port */ /** Get the HTTP port */
public int getHttpPort() { return datum.httpPort; } public int getHttpPort() { return datum.httpPort; }
/** Get the shuffle port */
public int getShufflePort() { return datum.shufflePort; }
/** Get the attempt id */ /** Get the attempt id */
public TaskAttemptID getTaskAttemptId() { public TaskAttemptID getTaskAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString()); return TaskAttemptID.forName(datum.attemptId.toString());

View File

@ -35,10 +35,13 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -107,6 +110,21 @@ public class TestJobHistoryParsing {
1, taskAttemptCount); 1, taskAttemptCount);
} }
// Deep compare Job and JobInfo
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = jobInfo.getAllTasks().get(
TypeConverter.fromYarn(task.getID()));
Assert.assertNotNull("TaskInfo not found", taskInfo);
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo =
taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
Assert.assertEquals("Incorrect shuffle port for task attempt",
taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
}
}
String summaryFileName = JobHistoryUtils String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId); .getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName); Path summaryFile = new Path(jobhistoryDir, summaryFileName);

View File

@ -2676,7 +2676,7 @@ public class JobInProgress {
TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
status.getTaskID(), taskType, status.getStartTime(), status.getTaskID(), taskType, status.getStartTime(),
status.getTaskTracker(), ttStatus.getHttpPort()); status.getTaskTracker(), ttStatus.getHttpPort(), -1);
jobHistory.logEvent(tse, status.getTaskID().getJobID()); jobHistory.logEvent(tse, status.getTaskID().getJobID());
TaskAttemptID statusAttemptID = status.getTaskID(); TaskAttemptID statusAttemptID = status.getTaskID();
@ -3197,7 +3197,7 @@ public class JobInProgress {
StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0])); StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
TaskType taskType = getTaskType(tip); TaskType taskType = getTaskType(tip);
TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
taskid, taskType, startTime, taskTrackerName, taskTrackerPort); taskid, taskType, startTime, taskTrackerName, taskTrackerPort, -1);
jobHistory.logEvent(tse, taskid.getJobID()); jobHistory.logEvent(tse, taskid.getJobID());

View File

@ -48,7 +48,7 @@ public class TestJobHistoryEvents extends TestCase {
TaskType[] types) { TaskType[] types) {
for (TaskType t : types) { for (TaskType t : types) {
TaskAttemptStartedEvent tase = TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(id, t, 0L, "", 0); new TaskAttemptStartedEvent(id, t, 0L, "", 0, -1);
assertEquals(expected, tase.getEventType()); assertEquals(expected, tase.getEventType());
} }
} }

View File

@ -79,7 +79,7 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
.parseInt(httpPort); .parseInt(httpPort);
return new TaskAttemptStartedEvent(taskAttemptID, return new TaskAttemptStartedEvent(taskAttemptID,
that.originalTaskType, that.originalStartTime, trackerName, port); that.originalTaskType, that.originalStartTime, trackerName, port, -1);
} }
return null; return null;