svn merge -c 1572269 FIXES: Preserve Job diagnostics in history. Contributed by Gera Shegalov
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1572276 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4812bd1569
commit
58d84af922
|
@ -37,6 +37,9 @@ Release 2.4.0 - UNRELEASED
|
|||
MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle
|
||||
is on in the shuffle-handler. (Jian He via vinodkv)
|
||||
|
||||
MAPREDUCE-5754. Preserve Job diagnostics in history (Gera Shegalov via
|
||||
jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TypeConverter;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||
|
@ -343,11 +344,12 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
LOG.warn("Found jobId " + toClose
|
||||
+ " to have not been closed. Will close");
|
||||
//Create a JobFinishEvent so that it is written to the job history
|
||||
final Job job = context.getJob(toClose);
|
||||
JobUnsuccessfulCompletionEvent jucEvent =
|
||||
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
|
||||
System.currentTimeMillis(), context.getJob(toClose)
|
||||
.getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
|
||||
JobState.KILLED.toString());
|
||||
System.currentTimeMillis(), job.getCompletedMaps(),
|
||||
job.getCompletedReduces(), JobState.KILLED.toString(),
|
||||
job.getDiagnostics());
|
||||
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
|
||||
//Bypass the queue mechanism which might wait. Call the method directly
|
||||
handleEvent(jfEvent);
|
||||
|
|
|
@ -149,6 +149,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
|
||||
// Maximum no. of fetch-failure notifications after which map task is failed
|
||||
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
|
||||
|
||||
public static final String JOB_KILLED_DIAG =
|
||||
"Job received Kill while in RUNNING state.";
|
||||
|
||||
//final fields
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
|
@ -1617,7 +1620,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
finishTime,
|
||||
succeededMapTaskCount,
|
||||
succeededReduceTaskCount,
|
||||
finalState.toString());
|
||||
finalState.toString(),
|
||||
diagnostics);
|
||||
eventHandler.handle(new JobHistoryEvent(jobId,
|
||||
unsuccessfulJobEvent));
|
||||
finished(finalState);
|
||||
|
@ -1730,7 +1734,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
JobUnsuccessfulCompletionEvent failedEvent =
|
||||
new JobUnsuccessfulCompletionEvent(job.oldJobId,
|
||||
job.finishTime, 0, 0,
|
||||
JobStateInternal.KILLED.toString());
|
||||
JobStateInternal.KILLED.toString(), job.diagnostics);
|
||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
|
||||
job.finished(JobStateInternal.KILLED);
|
||||
}
|
||||
|
@ -1763,7 +1767,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
implements SingleArcTransition<JobImpl, JobEvent> {
|
||||
@Override
|
||||
public void transition(JobImpl job, JobEvent event) {
|
||||
job.addDiagnostic("Job received Kill while in RUNNING state.");
|
||||
job.addDiagnostic(JOB_KILLED_DIAG);
|
||||
for (Task task : job.tasks.values()) {
|
||||
job.eventHandler.handle(
|
||||
new TaskEvent(task.getID(), TaskEventType.T_KILL));
|
||||
|
@ -2127,7 +2131,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
JobUnsuccessfulCompletionEvent failedEvent =
|
||||
new JobUnsuccessfulCompletionEvent(job.oldJobId,
|
||||
job.finishTime, 0, 0,
|
||||
jobHistoryString);
|
||||
jobHistoryString, job.diagnostics);
|
||||
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
|
||||
job.finished(terminationState);
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.JobID;
|
|||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestEvents {
|
||||
|
@ -334,11 +335,12 @@ public class TestEvents {
|
|||
private FakeEvent getJobKilledEvent() {
|
||||
FakeEvent result = new FakeEvent(EventType.JOB_KILLED);
|
||||
JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion();
|
||||
datum.finishedMaps = 1;
|
||||
datum.finishedReduces = 2;
|
||||
datum.finishTime = 3;
|
||||
datum.jobid = "ID";
|
||||
datum.jobStatus = "STATUS";
|
||||
datum.setFinishedMaps(1);
|
||||
datum.setFinishedReduces(2);
|
||||
datum.setFinishTime(3L);
|
||||
datum.setJobid("ID");
|
||||
datum.setJobStatus("STATUS");
|
||||
datum.setDiagnostics(JobImpl.JOB_KILLED_DIAG);
|
||||
result.setDatum(datum);
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -135,7 +135,8 @@
|
|||
{"name": "finishTime", "type": "long"},
|
||||
{"name": "finishedMaps", "type": "int"},
|
||||
{"name": "finishedReduces", "type": "int"},
|
||||
{"name": "jobStatus", "type": "string"}
|
||||
{"name": "jobStatus", "type": "string"},
|
||||
{"name": "diagnostics", "type": "string"}
|
||||
]
|
||||
},
|
||||
|
||||
|
|
|
@ -353,10 +353,6 @@ public class JobHistoryParser implements HistoryEventHandler {
|
|||
taskInfo.error = StringInterner.weakIntern(event.getError());
|
||||
taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
|
||||
taskInfo.counters = event.getCounters();
|
||||
if (info.errorInfo.isEmpty()) {
|
||||
info.errorInfo = "Task " + taskInfo.taskId + " failed " +
|
||||
taskInfo.attemptsMap.size() + " times ";
|
||||
}
|
||||
}
|
||||
|
||||
private void handleTaskStartedEvent(TaskStartedEvent event) {
|
||||
|
@ -373,6 +369,7 @@ public class JobHistoryParser implements HistoryEventHandler {
|
|||
info.finishedMaps = event.getFinishedMaps();
|
||||
info.finishedReduces = event.getFinishedReduces();
|
||||
info.jobStatus = StringInterner.weakIntern(event.getStatus());
|
||||
info.errorInfo = StringInterner.weakIntern(event.getDiagnostics());
|
||||
}
|
||||
|
||||
private void handleJobFinishedEvent(JobFinishedEvent event) {
|
||||
|
|
|
@ -18,11 +18,15 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.jobhistory;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import org.apache.avro.util.Utf8;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Event to record Failed and Killed completion of jobs
|
||||
*
|
||||
|
@ -30,6 +34,10 @@ import org.apache.hadoop.mapreduce.JobID;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
|
||||
private static final String NODIAGS = "";
|
||||
private static final Iterable<String> NODIAGS_LIST =
|
||||
Collections.singletonList(NODIAGS);
|
||||
|
||||
private JobUnsuccessfulCompletion datum
|
||||
= new JobUnsuccessfulCompletion();
|
||||
|
||||
|
@ -44,11 +52,33 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
|
|||
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
|
||||
int finishedMaps,
|
||||
int finishedReduces, String status) {
|
||||
datum.jobid = new Utf8(id.toString());
|
||||
datum.finishTime = finishTime;
|
||||
datum.finishedMaps = finishedMaps;
|
||||
datum.finishedReduces = finishedReduces;
|
||||
datum.jobStatus = new Utf8(status);
|
||||
this(id, finishTime, finishedMaps, finishedReduces, status, NODIAGS_LIST);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an event to record unsuccessful completion (killed/failed) of jobs
|
||||
* @param id Job ID
|
||||
* @param finishTime Finish time of the job
|
||||
* @param finishedMaps Number of finished maps
|
||||
* @param finishedReduces Number of finished reduces
|
||||
* @param status Status of the job
|
||||
* @param diagnostics job runtime diagnostics
|
||||
*/
|
||||
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
|
||||
int finishedMaps,
|
||||
int finishedReduces,
|
||||
String status,
|
||||
Iterable<String> diagnostics) {
|
||||
datum.setJobid(new Utf8(id.toString()));
|
||||
datum.setFinishTime(finishTime);
|
||||
datum.setFinishedMaps(finishedMaps);
|
||||
datum.setFinishedReduces(finishedReduces);
|
||||
datum.setJobStatus(new Utf8(status));
|
||||
if (diagnostics == null) {
|
||||
diagnostics = NODIAGS_LIST;
|
||||
}
|
||||
datum.setDiagnostics(new Utf8(Joiner.on('\n').skipNulls()
|
||||
.join(diagnostics)));
|
||||
}
|
||||
|
||||
JobUnsuccessfulCompletionEvent() {}
|
||||
|
@ -61,13 +91,13 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
|
|||
/** Get the Job ID */
|
||||
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
|
||||
/** Get the job finish time */
|
||||
public long getFinishTime() { return datum.finishTime; }
|
||||
public long getFinishTime() { return datum.getFinishTime(); }
|
||||
/** Get the number of finished maps */
|
||||
public int getFinishedMaps() { return datum.finishedMaps; }
|
||||
public int getFinishedMaps() { return datum.getFinishedMaps(); }
|
||||
/** Get the number of finished reduces */
|
||||
public int getFinishedReduces() { return datum.finishedReduces; }
|
||||
public int getFinishedReduces() { return datum.getFinishedReduces(); }
|
||||
/** Get the status */
|
||||
public String getStatus() { return datum.jobStatus.toString(); }
|
||||
public String getStatus() { return datum.getJobStatus().toString(); }
|
||||
/** Get the event type */
|
||||
public EventType getEventType() {
|
||||
if ("FAILED".equals(getStatus())) {
|
||||
|
@ -78,4 +108,13 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
|
|||
return EventType.JOB_KILLED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves diagnostics information preserved in the history file
|
||||
*
|
||||
* @return diagnostics as of the time of job termination
|
||||
*/
|
||||
public String getDiagnostics() {
|
||||
final CharSequence diagnostics = datum.getDiagnostics();
|
||||
return diagnostics == null ? NODIAGS : diagnostics.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@
|
|||
<configuration>
|
||||
<excludes>
|
||||
<exclude>src/test/resources/job_1329348432655_0001_conf.xml</exclude>
|
||||
<exclude>src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist</exclude>
|
||||
<exclude>src/test/resources/*.jhist</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -18,7 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
import static junit.framework.Assert.assertEquals;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
|
||||
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -26,6 +29,7 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -37,9 +41,9 @@ import junit.framework.Assert;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
|
@ -54,6 +58,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
|
|||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
|
||||
|
@ -67,8 +72,11 @@ import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||
|
@ -150,7 +158,7 @@ public class TestJobHistoryParsing {
|
|||
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
|
||||
long amStartTimeEst = System.currentTimeMillis();
|
||||
conf.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
|
||||
|
@ -391,7 +399,7 @@ public class TestJobHistoryParsing {
|
|||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
|
||||
|
@ -456,7 +464,7 @@ public class TestJobHistoryParsing {
|
|||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
|
||||
|
@ -500,18 +508,85 @@ public class TestJobHistoryParsing {
|
|||
Assert.assertNotNull("completed task report has null counters", ct
|
||||
.getReport().getCounters());
|
||||
}
|
||||
final List<String> originalDiagnostics = job.getDiagnostics();
|
||||
final String historyError = jobInfo.getErrorInfo();
|
||||
assertTrue("No original diagnostics for a failed job",
|
||||
originalDiagnostics != null && !originalDiagnostics.isEmpty());
|
||||
assertNotNull("No history error info for a failed job ", historyError);
|
||||
for (String diagString : originalDiagnostics) {
|
||||
assertTrue(historyError.contains(diagString));
|
||||
}
|
||||
} finally {
|
||||
LOG.info("FINISHED testCountersForFailedTask");
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDiagnosticsForKilledJob() throws Exception {
|
||||
LOG.info("STARTING testDiagnosticsForKilledJob");
|
||||
try {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setClass(
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this
|
||||
.getClass().getName(), true);
|
||||
app.submit(conf);
|
||||
Job job = app.getContext().getAllJobs().values().iterator().next();
|
||||
JobId jobId = job.getID();
|
||||
app.waitForState(job, JobState.KILLED);
|
||||
|
||||
// make sure all events are flushed
|
||||
app.waitForState(Service.STATE.STOPPED);
|
||||
|
||||
JobHistory jobHistory = new JobHistory();
|
||||
jobHistory.init(conf);
|
||||
|
||||
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
|
||||
|
||||
JobHistoryParser parser;
|
||||
JobInfo jobInfo;
|
||||
synchronized (fileInfo) {
|
||||
Path historyFilePath = fileInfo.getHistoryFile();
|
||||
FSDataInputStream in = null;
|
||||
FileContext fc = null;
|
||||
try {
|
||||
fc = FileContext.getFileContext(conf);
|
||||
in = fc.open(fc.makeQualified(historyFilePath));
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Can not open history file: " + historyFilePath, ioe);
|
||||
throw (new Exception("Can not open History File"));
|
||||
}
|
||||
|
||||
parser = new JobHistoryParser(in);
|
||||
jobInfo = parser.parse();
|
||||
}
|
||||
Exception parseException = parser.getParseException();
|
||||
assertNull("Caught an expected exception " + parseException,
|
||||
parseException);
|
||||
final List<String> originalDiagnostics = job.getDiagnostics();
|
||||
final String historyError = jobInfo.getErrorInfo();
|
||||
assertTrue("No original diagnostics for a failed job",
|
||||
originalDiagnostics != null && !originalDiagnostics.isEmpty());
|
||||
assertNotNull("No history error info for a failed job ", historyError);
|
||||
for (String diagString : originalDiagnostics) {
|
||||
assertTrue(historyError.contains(diagString));
|
||||
}
|
||||
assertTrue("No killed message in diagnostics",
|
||||
historyError.contains(JobImpl.JOB_KILLED_DIAG));
|
||||
} finally {
|
||||
LOG.info("FINISHED testDiagnosticsForKilledJob");
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 50000)
|
||||
public void testScanningOldDirs() throws Exception {
|
||||
LOG.info("STARTING testScanningOldDirs");
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
RackResolver.init(conf);
|
||||
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
|
||||
|
@ -591,6 +666,27 @@ public class TestJobHistoryParsing {
|
|||
}
|
||||
}
|
||||
|
||||
static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
|
||||
|
||||
public MRAppWithHistoryWithJobKilled(int maps, int reduces,
|
||||
boolean autoComplete, String testName, boolean cleanOnStart) {
|
||||
super(maps, reduces, autoComplete, testName, cleanOnStart);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected void attemptLaunched(TaskAttemptId attemptID) {
|
||||
if (attemptID.getTaskId().getId() == 0) {
|
||||
getContext().getEventHandler().handle(
|
||||
new JobEvent(attemptID.getTaskId().getJobId(),
|
||||
JobEventType.JOB_KILL));
|
||||
} else {
|
||||
getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class HistoryFileManagerForTest extends HistoryFileManager {
|
||||
void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
|
||||
jobListCache.delete(fileInfo);
|
||||
|
@ -613,7 +709,7 @@ public class TestJobHistoryParsing {
|
|||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
|
||||
RackResolver.init(conf);
|
||||
|
@ -667,7 +763,7 @@ public class TestJobHistoryParsing {
|
|||
Configuration configuration = new Configuration();
|
||||
configuration
|
||||
.setClass(
|
||||
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
||||
MyResolver.class, DNSToSwitchMapping.class);
|
||||
|
||||
RackResolver.init(configuration);
|
||||
|
@ -739,7 +835,7 @@ public class TestJobHistoryParsing {
|
|||
final org.apache.hadoop.mapreduce.TaskType taskType =
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP;
|
||||
final TaskID[] tids = new TaskID[2];
|
||||
JobID jid = new JobID("1", 1);
|
||||
final JobID jid = new JobID("1", 1);
|
||||
tids[0] = new TaskID(jid, taskType, 0);
|
||||
tids[1] = new TaskID(jid, taskType, 1);
|
||||
Mockito.when(reader.getNextEvent()).thenAnswer(
|
||||
|
@ -758,6 +854,13 @@ public class TestJobHistoryParsing {
|
|||
tfe.setDatum(tfe.getDatum());
|
||||
return tfe;
|
||||
}
|
||||
if (eventId < 5) {
|
||||
JobUnsuccessfulCompletionEvent juce =
|
||||
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
|
||||
"JOB_FAILED", Collections.singletonList(
|
||||
"Task failed: " + tids[0].toString()));
|
||||
return juce;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -765,4 +868,22 @@ public class TestJobHistoryParsing {
|
|||
assertTrue("Task 0 not implicated",
|
||||
info.getErrorInfo().contains(tids[0].toString()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
|
||||
final Path histPath = new Path(getClass().getClassLoader().getResource(
|
||||
"job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
|
||||
.getFile());
|
||||
final FileSystem lfs = FileSystem.getLocal(new Configuration());
|
||||
final FSDataInputStream fsdis = lfs.open(histPath);
|
||||
try {
|
||||
JobHistoryParser parser = new JobHistoryParser(fsdis);
|
||||
JobInfo info = parser.parse();
|
||||
assertEquals("History parsed jobId incorrectly",
|
||||
info.getJobId(), JobID.forName("job_1393307629410_0001") );
|
||||
assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
|
||||
} finally {
|
||||
fsdis.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue