diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cd985591f91..5469543e84d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -38,6 +38,9 @@ Release 2.7.2 - UNRELEASED position/key information for uncompressed input sometimes. (Zhihai Xu via jlowe) + MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can + disappear (Chang Li via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index aa022a16cf9..7aed78a6483 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1378,6 +1378,19 @@ public abstract class TaskAttemptImpl implements return tauce; } + private static void + sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) { + TaskAttemptContainerLaunchedEvent event; + taskAttempt.launchTime = taskAttempt.clock.getTime(); + + InetSocketAddress nodeHttpInetAddr = + NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); + taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); + taskAttempt.httpPort = nodeHttpInetAddr.getPort(); + taskAttempt.sendLaunchedEvents(); + } + + @SuppressWarnings("unchecked") private void sendLaunchedEvents() { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() @@ -1566,6 +1579,9 @@ public abstract class TaskAttemptImpl implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); @@ -1600,23 +1616,19 @@ public abstract class TaskAttemptImpl implements default: LOG.error("Task final state is not FAILED or KILLED: " + finalState); } - if (taskAttempt.getLaunchTime() != 0) { - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - finalState); - if(finalState == TaskAttemptStateInternal.FAILED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - } else if(finalState == TaskAttemptStateInternal.KILLED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - } - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - } else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + finalState); + if(finalState == TaskAttemptStateInternal.FAILED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + } else if(finalState == TaskAttemptStateInternal.KILLED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); } + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); } } @@ -1708,23 +1720,20 @@ public abstract class TaskAttemptImpl implements @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } // set the finish time taskAttempt.setFinishTime(); + + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.FAILED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not - // handling failed map/reduce events. - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); } @@ -1861,27 +1870,25 @@ public abstract class TaskAttemptImpl implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.KILLED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } + + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.KILLED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } -// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 3100d12ce14..40056718f19 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -528,10 +528,7 @@ public class MRApp extends MRAppMaster { public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - getContext().getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), - shufflePort)); - + containerLaunched(event.getTaskAttemptID(), shufflePort); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: @@ -543,6 +540,12 @@ public class MRApp extends MRAppMaster { } } + protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) { + getContext().getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, + shufflePort)); + } + protected void attemptLaunched(TaskAttemptId attemptID) { if (autoComplete) { // send the done event diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 1807c1c3e09..e3dd67a6da2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -112,6 +112,57 @@ public class TestTaskAttempt{ testMRAppHistory(app); } + @Test + public void testMRAppHistoryForTAFailedInAssigned() throws Exception { + // test TA_CONTAINER_LAUNCH_FAILED for map + FailingAttemptsDuringAssignedMRApp app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_LAUNCH_FAILED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_KILL for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + + // test TA_KILL for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + } + @Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = @@ -299,6 +350,31 @@ public class TestTaskAttempt{ report.getTaskAttemptState()); } + private void testTaskAttemptAssignedFailHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.FAILED); + Map tasks = job.getTasks(); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent()); + } + + private void testTaskAttemptAssignedKilledHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Map tasks = job.getTasks(); + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + Map attempts = task.getAttempts(); + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.KILLED); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent()); + } + static class FailingAttemptsMRApp extends MRApp { FailingAttemptsMRApp(int maps, int reduces) { super(maps, reduces, true, "FailingAttemptsMRApp", true); @@ -329,6 +405,72 @@ public class TestTaskAttempt{ } } + static class FailingAttemptsDuringAssignedMRApp extends MRApp { + FailingAttemptsDuringAssignedMRApp(int maps, int reduces, + TaskAttemptEventType event) { + super(maps, reduces, true, "FailingAttemptsMRApp", true); + sendFailEvent = event; + } + + TaskAttemptEventType sendFailEvent; + + @Override + protected void containerLaunched(TaskAttemptId attemptID, + int shufflePort) { + //do nothing, not send TA_CONTAINER_LAUNCHED event + } + + @Override + protected void attemptLaunched(TaskAttemptId attemptID) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, sendFailEvent)); + } + + private boolean receiveTaStartJHEvent = false; + private boolean receiveTaFailedJHEvent = false; + private boolean receiveTaKilledJHEvent = false; + + public boolean getTaStartJHEvent(){ + return receiveTaStartJHEvent; + } + + public boolean getTaFailedJHEvent(){ + return receiveTaFailedJHEvent; + } + + public boolean getTaKilledJHEvent(){ + return receiveTaKilledJHEvent; + } + + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new EventHandler() { + @Override + public void handle(JobHistoryEvent event) { + if (event.getType() == org.apache.hadoop.mapreduce.jobhistory. + EventType.MAP_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } + } + }; + } + } + @Test public void testLaunchFailedWhileKilling() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);