MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. (Contributed by Amar Kamat and Devaraj K)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1221939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2011-12-21 23:36:14 +00:00
parent 8d714e9355
commit 769178f3c0
20 changed files with 348 additions and 63 deletions

View File

@ -300,6 +300,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3588. Fixed bin/yarn which was broken by MAPREDUCE-3366 so that MAPREDUCE-3588. Fixed bin/yarn which was broken by MAPREDUCE-3366 so that
yarn daemons can start. (Arun C Murthy via vinodkv) yarn daemons can start. (Arun C Murthy via vinodkv)
MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. (Amar
Kamat and Devaraj K via sseth)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -926,6 +926,8 @@ public abstract class TaskAttemptImpl implements
: taskAttempt.containerNodeId.getHost(), : taskAttempt.containerNodeId.getHost(),
taskAttempt.containerNodeId == null ? -1 taskAttempt.containerNodeId == null ? -1
: taskAttempt.containerNodeId.getPort(), : taskAttempt.containerNodeId.getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join( StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
.getProgressSplitBlock().burst()); .getProgressSplitBlock().burst());

View File

@ -175,6 +175,7 @@
{"name": "taskType", "type": "string"}, {"name": "taskType", "type": "string"},
{"name": "taskStatus", "type": "string"}, {"name": "taskStatus", "type": "string"},
{"name": "finishTime", "type": "long"}, {"name": "finishTime", "type": "long"},
{"name": "rackname", "type": "string"},
{"name": "hostname", "type": "string"}, {"name": "hostname", "type": "string"},
{"name": "state", "type": "string"}, {"name": "state", "type": "string"},
{"name": "counters", "type": "JhCounters"} {"name": "counters", "type": "JhCounters"}
@ -202,6 +203,7 @@
{"name": "finishTime", "type": "long"}, {"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"}, {"name": "hostname", "type": "string"},
{"name": "port", "type": "int"}, {"name": "port", "type": "int"},
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"}, {"name": "status", "type": "string"},
{"name": "error", "type": "string"}, {"name": "error", "type": "string"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}}, {"name": "clockSplits", "type": { "type": "array", "items": "int"}},

View File

@ -224,7 +224,7 @@ public class JobHistoryParser {
attemptInfo.counters = event.getCounters(); attemptInfo.counters = event.getCounters();
attemptInfo.hostname = event.getHostname(); attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort(); attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackname(); attemptInfo.rackname = event.getRackName();
} }
private void handleTaskAttemptFailedEvent( private void handleTaskAttemptFailedEvent(
@ -237,6 +237,7 @@ public class JobHistoryParser {
attemptInfo.status = event.getTaskStatus(); attemptInfo.status = event.getTaskStatus();
attemptInfo.hostname = event.getHostname(); attemptInfo.hostname = event.getHostname();
attemptInfo.port = event.getPort(); attemptInfo.port = event.getPort();
attemptInfo.rackname = event.getRackName();
attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime();

View File

@ -68,7 +68,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.port = port; datum.port = port;
datum.rackname = new Utf8(rackName); // This is needed for reading old jh files
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -139,8 +142,12 @@ public class MapAttemptFinishedEvent implements HistoryEvent {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name */ /** Get the rack name */
public String getRackname() { return datum.rackname.toString(); } public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters */ /** Get the counters */

View File

@ -69,7 +69,9 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.port = port; datum.port = port;
datum.rackname = new Utf8(rackName); if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -142,8 +144,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the tracker rpc port */ /** Get the tracker rpc port */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */ /** Get the rack name of the node where the attempt ran */
public String getRackName() { return datum.rackname.toString(); } public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */

View File

@ -51,13 +51,16 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
*/ */
public TaskAttemptFinishedEvent(TaskAttemptID id, public TaskAttemptFinishedEvent(TaskAttemptID id,
TaskType taskType, String taskStatus, TaskType taskType, String taskStatus,
long finishTime, long finishTime, String rackName,
String hostname, String state, Counters counters) { String hostname, String state, Counters counters) {
datum.taskid = new Utf8(id.getTaskID().toString()); datum.taskid = new Utf8(id.getTaskID().toString());
datum.attemptId = new Utf8(id.toString()); datum.attemptId = new Utf8(id.toString());
datum.taskType = new Utf8(taskType.name()); datum.taskType = new Utf8(taskType.name());
datum.taskStatus = new Utf8(taskStatus); datum.taskStatus = new Utf8(taskStatus);
datum.finishTime = finishTime; datum.finishTime = finishTime;
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
datum.state = new Utf8(state); datum.state = new Utf8(state);
datum.counters = EventWriter.toAvro(counters); datum.counters = EventWriter.toAvro(counters);
@ -86,6 +89,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return datum.finishTime; }
/** Get the host where the attempt executed */ /** Get the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the rackname where the attempt executed */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the state string */ /** Get the state string */
public String getState() { return datum.state.toString(); } public String getState() { return datum.state.toString(); }
/** Get the counters for the attempt */ /** Get the counters for the attempt */

View File

@ -47,6 +47,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* @param finishTime Finish time of the attempt * @param finishTime Finish time of the attempt
* @param hostname Name of the host where the attempt executed * @param hostname Name of the host where the attempt executed
* @param port rpc port for for the tracker * @param port rpc port for for the tracker
* @param rackName Name of the rack where the attempt executed
* @param error Error string * @param error Error string
* @param allSplits the "splits", or a pixelated graph of various * @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress. * measurable worker node state variables against progress.
@ -55,14 +56,17 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
*/ */
public TaskAttemptUnsuccessfulCompletionEvent public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType, (TaskAttemptID id, TaskType taskType,
String status, long finishTime, String status, long finishTime,
String hostname, int port, String error, String hostname, int port, String rackName,
int[][] allSplits) { String error, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString()); datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name()); datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString()); datum.attemptId = new Utf8(id.toString());
datum.finishTime = finishTime; datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname); datum.hostname = new Utf8(hostname);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.port = port; datum.port = port;
datum.error = new Utf8(error); datum.error = new Utf8(error);
datum.status = new Utf8(status); datum.status = new Utf8(status);
@ -99,7 +103,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType, (TaskAttemptID id, TaskType taskType,
String status, long finishTime, String status, long finishTime,
String hostname, String error) { String hostname, String error) {
this(id, taskType, status, finishTime, hostname, -1, error, null); this(id, taskType, status, finishTime, hostname, -1, null, error, null);
} }
TaskAttemptUnsuccessfulCompletionEvent() {} TaskAttemptUnsuccessfulCompletionEvent() {}
@ -125,6 +129,12 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
public String getHostname() { return datum.hostname.toString(); } public String getHostname() { return datum.hostname.toString(); }
/** Get the rpc port for the host where the attempt executed */ /** Get the rpc port for the host where the attempt executed */
public int getPort() { return datum.port; } public int getPort() { return datum.port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
}
/** Get the error string */ /** Get the error string */
public String getError() { return datum.error.toString(); } public String getError() { return datum.error.toString(); }
/** Get the task status */ /** Get the task status */

View File

@ -44,10 +44,13 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@ -62,10 +65,12 @@ import org.junit.Test;
public class TestJobHistoryParsing { public class TestJobHistoryParsing {
private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
private static final String RACK_NAME = "/MyRackName";
public static class MyResolver implements DNSToSwitchMapping { public static class MyResolver implements DNSToSwitchMapping {
@Override @Override
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{"/MyRackName"}); return Arrays.asList(new String[]{RACK_NAME});
} }
} }
@ -172,7 +177,7 @@ public class TestJobHistoryParsing {
// Verify rack-name // Verify rack-name
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), "/MyRackName"); .getRackname(), RACK_NAME);
} }
} }
@ -217,9 +222,89 @@ public class TestJobHistoryParsing {
Assert.assertEquals("Status does not match", "SUCCEEDED", Assert.assertEquals("Status does not match", "SUCCEEDED",
jobSummaryElements.get("status")); jobSummaryElements.get("status"));
} }
@Test
public void testHistoryParsingForFailedAttempts() throws Exception {
Configuration conf = new Configuration();
conf
.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(),
true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.SUCCEEDED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
.getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils
.getDoneFileName(jobIndexInfo);
Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
int noOffailedAttempts = 0;
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
for (Task task : job.getTasks().values()) {
TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
TypeConverter.fromYarn((taskAttempt.getID())));
// Verify rack-name for all task attempts
Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
.getRackname(), RACK_NAME);
if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
noOffailedAttempts++;
}
}
}
Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestJobHistoryParsing t = new TestJobHistoryParsing(); TestJobHistoryParsing t = new TestJobHistoryParsing();
t.testHistoryParsing(); t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts();
} }
} }

View File

@ -2671,7 +2671,9 @@ public class JobInProgress {
// Update jobhistory // Update jobhistory
TaskTrackerStatus ttStatus = TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTrackerStatus(status.getTaskTracker()); this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString(); Node node = jobtracker.getNode(ttStatus.getHost());
String trackerHostname = node.getName();
String trackerRackName = node.getParent().getName();
TaskType taskType = getTaskType(tip); TaskType taskType = getTaskType(tip);
TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent( TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
@ -2685,7 +2687,7 @@ public class JobInProgress {
MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent( MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getMapFinishTime(), status.getMapFinishTime(),
status.getFinishTime(), trackerHostname, -1, "", status.getFinishTime(), trackerHostname, -1, trackerRackName,
status.getStateString(), status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()), new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst() tip.getSplits(statusAttemptID).burst()
@ -2698,7 +2700,7 @@ public class JobInProgress {
statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
status.getShuffleFinishTime(), status.getShuffleFinishTime(),
status.getSortFinishTime(), status.getFinishTime(), status.getSortFinishTime(), status.getFinishTime(),
trackerHostname, -1, "", status.getStateString(), trackerHostname, -1, trackerRackName, status.getStateString(),
new org.apache.hadoop.mapreduce.Counters(status.getCounters()), new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
tip.getSplits(statusAttemptID).burst() tip.getSplits(statusAttemptID).burst()
); );
@ -3208,7 +3210,7 @@ public class JobInProgress {
(taskid, (taskid,
taskType, taskStatus.getRunState().toString(), taskType, taskStatus.getRunState().toString(),
finishTime, finishTime,
taskTrackerHostName, -1, diagInfo, taskTrackerHostName, -1, null, diagInfo,
splits.burst()); splits.burst());
jobHistory.logEvent(tue, taskid.getJobID()); jobHistory.logEvent(tue, taskid.getJobID());

View File

@ -83,7 +83,7 @@ public class TestJobHistoryEvents extends TestCase {
for (TaskType t : types) { for (TaskType t : types) {
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent new TaskAttemptUnsuccessfulCompletionEvent
(id, t, state, 0L, "", -1, "", NULL_SPLITS_ARRAY); (id, t, state, 0L, "", -1, "", "", NULL_SPLITS_ARRAY);
assertEquals(expected, tauce.getEventType()); assertEquals(expected, tauce.getEventType());
} }
} }
@ -132,7 +132,8 @@ public class TestJobHistoryEvents extends TestCase {
for (TaskType t : types) { for (TaskType t : types) {
TaskAttemptFinishedEvent tafe = TaskAttemptFinishedEvent tafe =
new TaskAttemptFinishedEvent(id, t, new TaskAttemptFinishedEvent(id, t,
TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", new Counters()); TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", "",
new Counters());
assertEquals(expected, tafe.getEventType()); assertEquals(expected, tafe.getEventType());
} }
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat; import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@ -49,6 +50,9 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions; import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -246,8 +250,10 @@ public class TestRumenJobTraces {
} }
/** /**
* Validate the parsing of given history file name. Also validate the history * Validate the parsing of given history file name.
* file name suffixed with old/stale file suffix. *
* TODO: Also validate the history file name suffixed with old/stale file
* suffix.
* @param jhFileName job history file path * @param jhFileName job history file path
* @param jid JobID * @param jid JobID
*/ */
@ -257,13 +263,7 @@ public class TestRumenJobTraces {
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName())); JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename" assertEquals("TraceBuilder failed to parse the current JH filename"
+ jhFileName, jid, extractedJID); + jhFileName, jid, extractedJID);
// test jobhistory filename with old/stale file suffix //TODO test jobhistory filename with old/stale file suffix
jhFileName = jhFileName.suffix(JobHistory.getOldFileSuffix("123"));
extractedJID =
JobID.forName(JobHistoryUtils.extractJobID(jhFileName.getName()));
assertEquals("TraceBuilder failed to parse the current JH filename"
+ "(old-suffix):" + jhFileName,
jid, extractedJID);
} }
/** /**
@ -318,8 +318,9 @@ public class TestRumenJobTraces {
.makeQualified(lfs.getUri(), lfs.getWorkingDirectory()); .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
// Check if current jobhistory filenames are detected properly // Check if current jobhistory filenames are detected properly
Path jhFilename = org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils JobId jobId = TypeConverter.toYarn(jid);
.getStagingJobHistoryFile(rootInputDir, jid.toString(), 1); JobIndexInfo info = new JobIndexInfo(0L, 0L, "", "", jobId, 0, 0, "");
Path jhFilename = new Path(FileNameIndexUtils.getDoneFileName(info));
validateHistoryFileNameParsing(jhFilename, jid); validateHistoryFileNameParsing(jhFilename, jid);
// Check if Pre21 V1 jophistory file names are detected properly // Check if Pre21 V1 jophistory file names are detected properly
@ -932,18 +933,18 @@ public class TestRumenJobTraces {
subject.process(new TaskAttemptFinishedEvent(TaskAttemptID subject.process(new TaskAttemptFinishedEvent(TaskAttemptID
.forName("attempt_200904211745_0003_m_000004_0"), TaskType .forName("attempt_200904211745_0003_m_000004_0"), TaskType
.valueOf("MAP"), "STATUS", 1234567890L, .valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com", "/194\\.6\\.134\\.64", "cluster50261\\.secondleveldomain\\.com",
"SUCCESS", null)); "SUCCESS", null));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"), (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L, TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com", "cluster50262\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits)); -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskAttemptUnsuccessfulCompletionEvent subject.process(new TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"), (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
TaskType.valueOf("MAP"), "STATUS", 1234567890L, TaskType.valueOf("MAP"), "STATUS", 1234567890L,
"/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com", "cluster50263\\.secondleveldomain\\.com",
-1, "MACHINE_EXPLODED", splits)); -1, "/194\\.6\\.134\\.80", "MACHINE_EXPLODED", splits));
subject.process(new TaskStartedEvent(TaskID subject.process(new TaskStartedEvent(TaskID
.forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
.valueOf("MAP"), .valueOf("MAP"),

View File

@ -5,6 +5,9 @@
"children" : [ { "children" : [ {
"name" : "cluster50213\\.secondleveldomain\\.com", "name" : "cluster50213\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50235\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50226\\.secondleveldomain\\.com", "name" : "cluster50226\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -20,6 +23,9 @@
}, { }, {
"name" : "cluster50231\\.secondleveldomain\\.com", "name" : "cluster50231\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50223\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50232\\.secondleveldomain\\.com", "name" : "cluster50232\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -98,12 +104,18 @@
}, { }, {
"name" : "cluster1236\\.secondleveldomain\\.com", "name" : "cluster1236\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1232\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.134\\.64", "name" : "194\\.6\\.134\\.64",
"children" : [ { "children" : [ {
"name" : "cluster50317\\.secondleveldomain\\.com", "name" : "cluster50317\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50283\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50292\\.secondleveldomain\\.com", "name" : "cluster50292\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -146,6 +158,9 @@
}, { }, {
"name" : "cluster50316\\.secondleveldomain\\.com", "name" : "cluster50316\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50303\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.129\\.128", "name" : "194\\.6\\.129\\.128",
@ -431,6 +446,9 @@
}, { }, {
"name" : "cluster50120\\.secondleveldomain\\.com", "name" : "cluster50120\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50132\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50130\\.secondleveldomain\\.com", "name" : "cluster50130\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -566,9 +584,15 @@
}, { }, {
"name" : "cluster50166\\.secondleveldomain\\.com", "name" : "cluster50166\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50173\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50170\\.secondleveldomain\\.com", "name" : "cluster50170\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50189\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50179\\.secondleveldomain\\.com", "name" : "cluster50179\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -578,6 +602,21 @@
"children" : [ { "children" : [ {
"name" : "cluster1283\\.secondleveldomain\\.com", "name" : "cluster1283\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1295\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1310\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1305\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1299\\.secondleveldomain\\.com", "name" : "cluster1299\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -587,20 +626,14 @@
}, { }, {
"name" : "cluster1288\\.secondleveldomain\\.com", "name" : "cluster1288\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1302\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1294\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1289\\.secondleveldomain\\.com", "name" : "cluster1289\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1315\\.secondleveldomain\\.com", "name" : "cluster1314\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1305\\.secondleveldomain\\.com", "name" : "cluster1315\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
"name" : "cluster1316\\.secondleveldomain\\.com", "name" : "cluster1316\\.secondleveldomain\\.com",
@ -662,6 +695,9 @@
}, { }, {
"name" : "cluster3054\\.secondleveldomain\\.com", "name" : "cluster3054\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster3064\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster3077\\.secondleveldomain\\.com", "name" : "cluster3077\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -695,6 +731,9 @@
"children" : [ { "children" : [ {
"name" : "cluster50468\\.secondleveldomain\\.com", "name" : "cluster50468\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50445\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50476\\.secondleveldomain\\.com", "name" : "cluster50476\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -785,6 +824,9 @@
}, { }, {
"name" : "cluster50493\\.secondleveldomain\\.com", "name" : "cluster50493\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50511\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50510\\.secondleveldomain\\.com", "name" : "cluster50510\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1100,6 +1142,9 @@
}, { }, {
"name" : "cluster1907\\.secondleveldomain\\.com", "name" : "cluster1907\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1917\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "192\\.30\\.63\\.192", "name" : "192\\.30\\.63\\.192",
@ -1223,6 +1268,9 @@
}, { }, {
"name" : "cluster1446\\.secondleveldomain\\.com", "name" : "cluster1446\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1440\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.132\\.128", "name" : "194\\.6\\.132\\.128",
@ -1238,6 +1286,9 @@
}, { }, {
"name" : "cluster50025\\.secondleveldomain\\.com", "name" : "cluster50025\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50024\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50021\\.secondleveldomain\\.com", "name" : "cluster50021\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1292,6 +1343,9 @@
}, { }, {
"name" : "cluster50348\\.secondleveldomain\\.com", "name" : "cluster50348\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50346\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50325\\.secondleveldomain\\.com", "name" : "cluster50325\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1379,6 +1433,9 @@
}, { }, {
"name" : "cluster1662\\.secondleveldomain\\.com", "name" : "cluster1662\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1647\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1649\\.secondleveldomain\\.com", "name" : "cluster1649\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1430,6 +1487,9 @@
}, { }, {
"name" : "cluster1503\\.secondleveldomain\\.com", "name" : "cluster1503\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1514\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.129\\.0", "name" : "194\\.6\\.129\\.0",
@ -1439,6 +1499,9 @@
}, { }, {
"name" : "cluster50539\\.secondleveldomain\\.com", "name" : "cluster50539\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50533\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50530\\.secondleveldomain\\.com", "name" : "cluster50530\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1475,6 +1538,9 @@
}, { }, {
"name" : "cluster50418\\.secondleveldomain\\.com", "name" : "cluster50418\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster50406\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster50411\\.secondleveldomain\\.com", "name" : "cluster50411\\.secondleveldomain\\.com",
"children" : null "children" : null
@ -1527,6 +1593,9 @@
}, { }, {
"name" : "194\\.6\\.128\\.64", "name" : "194\\.6\\.128\\.64",
"children" : [ { "children" : [ {
"name" : "cluster1613\\.secondleveldomain\\.com",
"children" : null
}, {
"name" : "cluster1639\\.secondleveldomain\\.com", "name" : "cluster1639\\.secondleveldomain\\.com",
"children" : null "children" : null
}, { }, {
@ -1574,6 +1643,9 @@
}, { }, {
"name" : "cluster1602\\.secondleveldomain\\.com", "name" : "cluster1602\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1627\\.secondleveldomain\\.com",
"children" : null
} ] } ]
}, { }, {
"name" : "194\\.6\\.132\\.192", "name" : "194\\.6\\.132\\.192",
@ -1661,6 +1733,9 @@
}, { }, {
"name" : "cluster1736\\.secondleveldomain\\.com", "name" : "cluster1736\\.secondleveldomain\\.com",
"children" : null "children" : null
}, {
"name" : "cluster1735\\.secondleveldomain\\.com",
"children" : null
}, { }, {
"name" : "cluster1722\\.secondleveldomain\\.com", "name" : "cluster1722\\.secondleveldomain\\.com",
"children" : null "children" : null

View File

@ -1329,6 +1329,8 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
if (host != null) { if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName()); attempt.setHostName(host.getNodeName(), host.getRackName());
attempt.setLocation(host.makeLoggedLocation()); attempt.setLocation(host.makeLoggedLocation());
} else {
attempt.setHostName(hostName, null);
} }
List<LoggedLocation> locs = task.getPreferredLocations(); List<LoggedLocation> locs = task.getPreferredLocations();
@ -1491,9 +1493,13 @@ public class HadoopLogsAnalyzer extends Configured implements Tool {
} }
} }
ParsedHost host = getAndRecordParsedHost(hostName); if (hostName != null) {
if (host != null) { ParsedHost host = getAndRecordParsedHost(hostName);
attempt.setHostName(host.getNodeName(), host.getRackName()); if (host != null) {
attempt.setHostName(host.getNodeName(), host.getRackName());
} else {
attempt.setHostName(hostName, null);
}
} }
if (attemptID != null) { if (attemptID != null) {

View File

@ -474,10 +474,11 @@ public class JobBuilder {
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
if (parsedHost != null) { getAndRecordParsedHost(event.getRackName(), event.getHostname());
attempt.setLocation(parsedHost.makeLoggedLocation()); if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
} }
attempt.setFinishTime(event.getFinishTime()); attempt.setFinishTime(event.getFinishTime());
@ -506,8 +507,10 @@ public class JobBuilder {
return; return;
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setLocation(getAndRecordParsedHost(event.getHostname()) ParsedHost pHost = getAndRecordParsedHost(event.getRackName(), event.getHostname());
.makeLoggedLocation()); if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
attempt.setFinishTime(event.getFinishTime()); attempt.setFinishTime(event.getFinishTime());
attempt attempt
.incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters); .incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters);
@ -523,6 +526,11 @@ public class JobBuilder {
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackName()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event. // XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this // We might consider extracting it from this event. Currently this
@ -546,8 +554,14 @@ public class JobBuilder {
return; return;
} }
attempt.setResult(getPre21Value(event.getTaskStatus())); attempt.setResult(getPre21Value(event.getTaskStatus()));
attempt.setHostName(event.getHostname(), event.getRackname()); attempt.setHostName(event.getHostname(), event.getRackName());
ParsedHost pHost =
getAndRecordParsedHost(event.getRackName(), event.getHostname());
if (pHost != null) {
attempt.setLocation(pHost.makeLoggedLocation());
}
// XXX There may be redundant location info available in the event. // XXX There may be redundant location info available in the event.
// We might consider extracting it from this event. Currently this // We might consider extracting it from this event. Currently this
// is redundant, but making this will add future-proofing. // is redundant, but making this will add future-proofing.
@ -676,7 +690,19 @@ public class JobBuilder {
} }
private ParsedHost getAndRecordParsedHost(String hostName) { private ParsedHost getAndRecordParsedHost(String hostName) {
ParsedHost result = ParsedHost.parse(hostName); return getAndRecordParsedHost(null, hostName);
}
private ParsedHost getAndRecordParsedHost(String rackName, String hostName) {
ParsedHost result = null;
if (rackName == null) {
// for old (pre-23) job history files where hostname was represented as
// /rackname/hostname
result = ParsedHost.parse(hostName);
} else {
// for new (post-23) job history files
result = new ParsedHost(rackName, hostName);
}
if (result != null) { if (result != null) {
ParsedHost canonicalResult = allHosts.get(result); ParsedHost canonicalResult = allHosts.get(result);

View File

@ -69,11 +69,17 @@ class ParsedHost {
return new ParsedHost(matcher.group(1), matcher.group(2)); return new ParsedHost(matcher.group(1), matcher.group(2));
} }
private String process(String name) {
return name == null
? null
: name.startsWith("/") ? name.substring(1) : name;
}
public ParsedHost(LoggedLocation loc) { public ParsedHost(LoggedLocation loc) {
List<String> coordinates = loc.getLayers(); List<String> coordinates = loc.getLayers();
rackName = coordinates.get(0); rackName = process(coordinates.get(0));
nodeName = coordinates.get(1); nodeName = process(coordinates.get(1));
} }
LoggedLocation makeLoggedLocation() { LoggedLocation makeLoggedLocation() {
@ -99,8 +105,8 @@ class ParsedHost {
// expects the broadest name first // expects the broadest name first
ParsedHost(String rackName, String nodeName) { ParsedHost(String rackName, String nodeName) {
this.rackName = rackName; this.rackName = process(rackName);
this.nodeName = nodeName; this.nodeName = process(nodeName);
} }
@Override @Override

View File

@ -108,9 +108,12 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
TaskAttempt20LineEventEmitter that = TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg; (TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
return new TaskAttemptFinishedEvent(taskAttemptID, return new TaskAttemptFinishedEvent(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime), that.originalTaskType, status, Long.parseLong(finishTime),
hostName, state, maybeParseCounters(counters)); pHost.getRackName(), pHost.getNodeName(), state,
maybeParseCounters(counters));
} }
return null; return null;
@ -138,10 +141,19 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
TaskAttempt20LineEventEmitter that = TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg; (TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
String rackName = null;
// Earlier versions of MR logged on hostnames (without rackname) for
// unsuccessful attempts
if (pHost != null) {
rackName = pHost.getRackName();
hostName = pHost.getNodeName();
}
return new TaskAttemptUnsuccessfulCompletionEvent return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID, (taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime), that.originalTaskType, status, Long.parseLong(finishTime),
hostName, -1, error, null); hostName, -1, rackName, error, null);
} }
return null; return null;

View File

@ -25,6 +25,8 @@ import java.util.StringTokenizer;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
/** /**
@ -46,6 +48,10 @@ public class TopologyBuilder {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) { } else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event); processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
} }
// I do NOT expect these if statements to be exhaustive. // I do NOT expect these if statements to be exhaustive.
@ -78,15 +84,40 @@ public class TopologyBuilder {
private void processTaskAttemptUnsuccessfulCompletionEvent( private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) { TaskAttemptUnsuccessfulCompletionEvent event) {
recordParsedHost(event.getHostname()); recordParsedHost(event.getHostname(), event.getRackName());
} }
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
recordParsedHost(event.getHostname()); recordParsedHost(event.getHostname(), event.getRackName());
} }
private void recordParsedHost(String hostName) { private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
ParsedHost result = ParsedHost.parse(hostName); recordParsedHost(event.getHostname(), event.getRackName());
}
private void processReduceAttemptFinishedEvent(ReduceAttemptFinishedEvent event) {
recordParsedHost(event.getHostname(), event.getRackName());
}
private void recordParsedHost(String hostName, String rackName) {
if (hostName == null) {
return;
}
ParsedHost result = null;
if (rackName == null) {
result = ParsedHost.parse(hostName);
} else {
result = new ParsedHost(rackName, hostName);
}
if (result != null && !allHosts.contains(result)) {
allHosts.add(result);
}
}
private void recordParsedHost(String nodeName) {
ParsedHost result = ParsedHost.parse(nodeName);
if (result != null && !allHosts.contains(result)) { if (result != null && !allHosts.contains(result)) {
allHosts.add(result); allHosts.add(result);