From c70f5eb8fa997b1e464d216516e8197d6fef8207 Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Tue, 19 May 2020 11:06:36 +0530 Subject: [PATCH] MAPREDUCE-6826. Job fails with InvalidStateTransitonException: Invalid event: JOB_TASK_COMPLETED at SUCCEEDED/COMMITTING. Contributed by Bilwa S T. (cherry picked from commit d4e36409d40d9f0783234a3b98394962ae0da87e) --- .../hadoop/mapreduce/v2/app/job/impl/JobImpl.java | 6 ++++-- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 12 +++++++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 5ef12509ad7..5489f52f6ee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -422,7 +422,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, EnumSet.of(JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED)) + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_COMPLETED)) // Transitions from SUCCEEDED state .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, @@ -441,7 +442,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_AM_REBOOT, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED)) + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_COMPLETED)) // Transitions from FAIL_WAIT state .addTransition(JobStateInternal.FAIL_WAIT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 43e59a7b345..5f378e4f9c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -204,7 +204,7 @@ public class TestJobImpl { public void testCheckJobCompleteSuccess() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); - AsyncDispatcher dispatcher = new AsyncDispatcher(); + DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); @@ -226,6 +226,11 @@ public class TestJobImpl { JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.COMMITTING); + job.handle(new JobEvent(job.getID(), + JobEventType.JOB_TASK_COMPLETED)); + dispatcher.await(); + assertJobState(job, JobStateInternal.COMMITTING); + // let the committer complete and verify the job succeeds syncBarrier.await(); assertJobState(job, JobStateInternal.SUCCEEDED); @@ -237,6 +242,11 @@ public class TestJobImpl { job.handle(new JobEvent(job.getID(), JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.SUCCEEDED); + + job.handle(new JobEvent(job.getID(), + JobEventType.JOB_TASK_COMPLETED)); + dispatcher.await(); + assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop();