MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2015-09-18 19:21:35 +00:00
parent 521196874c
commit 7429438c85
4 changed files with 202 additions and 47 deletions

View File

@ -38,6 +38,9 @@ Release 2.7.2 - UNRELEASED
position/key information for uncompressed input sometimes. (Zhihai Xu via
jlowe)
MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can
disappear (Chang Li via jlowe)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -1378,6 +1378,19 @@ public abstract class TaskAttemptImpl implements
return tauce;
}
private static void
sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
TaskAttemptContainerLaunchedEvent event;
taskAttempt.launchTime = taskAttempt.clock.getTime();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents();
}
@SuppressWarnings("unchecked")
private void sendLaunchedEvents() {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
@ -1566,6 +1579,9 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
@ -1600,7 +1616,7 @@ public abstract class TaskAttemptImpl implements
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
}
if (taskAttempt.getLaunchTime() != 0) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState);
@ -1613,10 +1629,6 @@ public abstract class TaskAttemptImpl implements
}
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
}
}
@ -1708,10 +1720,12 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
// set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
@ -1719,12 +1733,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
}
@ -1861,9 +1870,12 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
@ -1871,17 +1883,12 @@ public abstract class TaskAttemptImpl implements
TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));

View File

@ -528,10 +528,7 @@ public class MRApp extends MRAppMaster {
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
shufflePort));
containerLaunched(event.getTaskAttemptID(), shufflePort);
attemptLaunched(event.getTaskAttemptID());
break;
case CONTAINER_REMOTE_CLEANUP:
@ -543,6 +540,12 @@ public class MRApp extends MRAppMaster {
}
}
protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID,
shufflePort));
}
protected void attemptLaunched(TaskAttemptId attemptID) {
if (autoComplete) {
// send the done event

View File

@ -112,6 +112,57 @@ public class TestTaskAttempt{
testMRAppHistory(app);
}
@Test
public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
// test TA_CONTAINER_LAUNCH_FAILED for map
FailingAttemptsDuringAssignedMRApp app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_LAUNCH_FAILED for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_COMPLETED for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_COMPLETED for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_FAILMSG);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_FAILMSG);
testTaskAttemptAssignedFailHistory(app);
// test TA_KILL for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_KILL);
testTaskAttemptAssignedKilledHistory(app);
// test TA_KILL for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_KILL);
testTaskAttemptAssignedKilledHistory(app);
}
@Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@ -299,6 +350,31 @@ public class TestTaskAttempt{
report.getTaskAttemptState());
}
private void testTaskAttemptAssignedFailHistory
(FailingAttemptsDuringAssignedMRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
}
private void testTaskAttemptAssignedKilledHistory
(FailingAttemptsDuringAssignedMRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
Task task = tasks.values().iterator().next();
app.waitForState(task, TaskState.SCHEDULED);
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.KILLED);
Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
}
static class FailingAttemptsMRApp extends MRApp {
FailingAttemptsMRApp(int maps, int reduces) {
super(maps, reduces, true, "FailingAttemptsMRApp", true);
@ -329,6 +405,72 @@ public class TestTaskAttempt{
}
}
static class FailingAttemptsDuringAssignedMRApp extends MRApp {
FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
TaskAttemptEventType event) {
super(maps, reduces, true, "FailingAttemptsMRApp", true);
sendFailEvent = event;
}
TaskAttemptEventType sendFailEvent;
@Override
protected void containerLaunched(TaskAttemptId attemptID,
int shufflePort) {
//do nothing, not send TA_CONTAINER_LAUNCHED event
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, sendFailEvent));
}
private boolean receiveTaStartJHEvent = false;
private boolean receiveTaFailedJHEvent = false;
private boolean receiveTaKilledJHEvent = false;
public boolean getTaStartJHEvent(){
return receiveTaStartJHEvent;
}
public boolean getTaFailedJHEvent(){
return receiveTaFailedJHEvent;
}
public boolean getTaKilledJHEvent(){
return receiveTaKilledJHEvent;
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new EventHandler<JobHistoryEvent>() {
@Override
public void handle(JobHistoryEvent event) {
if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
EventType.MAP_ATTEMPT_FAILED) {
receiveTaFailedJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.MAP_ATTEMPT_KILLED) {
receiveTaKilledJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.MAP_ATTEMPT_STARTED) {
receiveTaStartJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
receiveTaFailedJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
receiveTaKilledJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
receiveTaStartJHEvent = true;
}
}
};
}
}
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);