From 2a76cade32ba852f5bf738f75bb01afce8299809 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 25 Apr 2014 22:34:44 +0000 Subject: [PATCH] svn merge -c 1590168 FIXES: MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state. Contributed by Ming Ma git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1590174 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/TaskImpl.java | 8 ++ .../hadoop/mapreduce/v2/app/TestKill.java | 81 +++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 79d7f82078b..516e61b884e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -100,6 +100,9 @@ Release 2.4.1 - UNRELEASED MAPREDUCE-5843. Fixed TestMRKeyValueTextInputFormat to not leak files and thus avoid failing on Windows. (Varun Vasudev via vinodkv) + MAPREDUCE-5835. Killing Task might cause the job to go to ERROR state + (Ming Ma via jlowe) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 9d14d4e2b6a..ca810598ab2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -249,8 +249,16 @@ TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) TaskEventType.T_ATTEMPT_SUCCEEDED)) // Transitions from KILLED state + // There could be a race condition where TaskImpl might receive + // T_ATTEMPT_SUCCEEDED followed by T_ATTEMPTED_KILLED for the same attempt. + // a. The task is in KILL_WAIT. + // b. Before TA transitions to SUCCEEDED state, Task sends TA_KILL event. + // c. TA transitions to SUCCEEDED state and thus send T_ATTEMPT_SUCCEEDED + // to the task. The task transitions to KILLED state. + // d. TA processes TA_KILL event and sends T_ATTEMPT_KILLED to the task. .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED, EnumSet.of(TaskEventType.T_KILL, + TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // create the topology tables diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index 5ff22fac631..c33bd4d173d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -214,6 +214,87 @@ public Dispatcher createDispatcher() { app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); } + static class MyAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private TaskAttemptEventType attemptEventTypeToWait; + MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { + super(); + this.latch = latch; + this.attemptEventTypeToWait = attemptEventTypeToWait; + } + + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; + TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); + if (attemptEvent.getType() == this.attemptEventTypeToWait + && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + super.dispatch(event); + } + } + + // This is to test a race condition where JobEventType.JOB_KILL is generated + // right after TaskAttemptEventType.TA_DONE is generated. + // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED + // and T_ATTEMPT_KILLED from the same attempt. + @Test + public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE); + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // The order in the dispatch event queue, from the oldest to the newest + // TA_DONE + // JOB_KILL + // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) + // T_KILL ( from JOB_KILL's handling ) + // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) + // TA_KILL ( from T_KILL's handling ) + // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling ) + // T_ATTEMPT_KILLED ( from TA_KILL's handling ) + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + //unblock + latch.countDown(); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1);