MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2015-09-17 21:37:39 +00:00
parent 9eee97508f
commit ee4ee6af6a
4 changed files with 213 additions and 47 deletions

View File

@ -573,6 +573,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
attempt (Chang Li via jlowe)
MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can
disappear (Chang Li via jlowe)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1484,6 +1484,19 @@ public abstract class TaskAttemptImpl implements
return tauce;
}
private static void
sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
TaskAttemptContainerLaunchedEvent event;
taskAttempt.launchTime = taskAttempt.clock.getTime();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents();
}
@SuppressWarnings("unchecked")
private void sendLaunchedEvents() {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
@ -1681,6 +1694,9 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
@ -1715,23 +1731,19 @@ public abstract class TaskAttemptImpl implements
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
}
if (taskAttempt.getLaunchTime() != 0) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState);
if(finalState == TaskAttemptStateInternal.FAILED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
} else if(finalState == TaskAttemptStateInternal.KILLED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
}
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState);
if(finalState == TaskAttemptStateInternal.FAILED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
} else if(finalState == TaskAttemptStateInternal.KILLED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
}
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}
}
@ -2023,27 +2035,25 @@ public abstract class TaskAttemptImpl implements
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
@ -2178,23 +2188,19 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked")
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
// set the finish time
taskAttempt.setFinishTime();
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));

View File

@ -544,10 +544,7 @@ public class MRApp extends MRAppMaster {
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
shufflePort));
containerLaunched(event.getTaskAttemptID(), shufflePort);
attemptLaunched(event.getTaskAttemptID());
break;
case CONTAINER_REMOTE_CLEANUP:
@ -561,6 +558,12 @@ public class MRApp extends MRAppMaster {
}
}
protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID,
shufflePort));
}
protected void attemptLaunched(TaskAttemptId attemptID) {
if (autoComplete) {
// send the done event

View File

@ -114,6 +114,69 @@ public class TestTaskAttempt{
testMRAppHistory(app);
}
@Test
public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
// test TA_CONTAINER_LAUNCH_FAILED for map
FailingAttemptsDuringAssignedMRApp app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_LAUNCH_FAILED for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_COMPLETED for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
testTaskAttemptAssignedFailHistory(app);
// test TA_CONTAINER_COMPLETED for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_FAILMSG);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_FAILMSG);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG_BY_CLIENT for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
testTaskAttemptAssignedFailHistory(app);
// test TA_FAILMSG_BY_CLIENT for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
testTaskAttemptAssignedFailHistory(app);
// test TA_KILL for map
app =
new FailingAttemptsDuringAssignedMRApp(1, 0,
TaskAttemptEventType.TA_KILL);
testTaskAttemptAssignedKilledHistory(app);
// test TA_KILL for reduce
app =
new FailingAttemptsDuringAssignedMRApp(0, 1,
TaskAttemptEventType.TA_KILL);
testTaskAttemptAssignedKilledHistory(app);
}
@Test
public void testSingleRackRequest() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@ -301,6 +364,31 @@ public class TestTaskAttempt{
report.getTaskAttemptState());
}
private void testTaskAttemptAssignedFailHistory
(FailingAttemptsDuringAssignedMRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
}
private void testTaskAttemptAssignedKilledHistory
(FailingAttemptsDuringAssignedMRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
Task task = tasks.values().iterator().next();
app.waitForState(task, TaskState.SCHEDULED);
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.KILLED);
Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
}
static class FailingAttemptsMRApp extends MRApp {
FailingAttemptsMRApp(int maps, int reduces) {
super(maps, reduces, true, "FailingAttemptsMRApp", true);
@ -331,6 +419,72 @@ public class TestTaskAttempt{
}
}
static class FailingAttemptsDuringAssignedMRApp extends MRApp {
FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
TaskAttemptEventType event) {
super(maps, reduces, true, "FailingAttemptsMRApp", true);
sendFailEvent = event;
}
TaskAttemptEventType sendFailEvent;
@Override
protected void containerLaunched(TaskAttemptId attemptID,
int shufflePort) {
//do nothing, not send TA_CONTAINER_LAUNCHED event
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, sendFailEvent));
}
private boolean receiveTaStartJHEvent = false;
private boolean receiveTaFailedJHEvent = false;
private boolean receiveTaKilledJHEvent = false;
public boolean getTaStartJHEvent(){
return receiveTaStartJHEvent;
}
public boolean getTaFailedJHEvent(){
return receiveTaFailedJHEvent;
}
public boolean getTaKilledJHEvent(){
return receiveTaKilledJHEvent;
}
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new EventHandler<JobHistoryEvent>() {
@Override
public void handle(JobHistoryEvent event) {
if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
EventType.MAP_ATTEMPT_FAILED) {
receiveTaFailedJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.MAP_ATTEMPT_KILLED) {
receiveTaKilledJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.MAP_ATTEMPT_STARTED) {
receiveTaStartJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
receiveTaFailedJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
receiveTaKilledJHEvent = true;
} else if (event.getType() == org.apache.hadoop.mapreduce.
jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
receiveTaStartJHEvent = true;
}
}
};
}
}
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);