diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2ed93b150db..32696f2462b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -500,6 +500,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit (Mark Fuhs via bobby) + + MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED + state (jlowe via bobby) Release 0.23.4 - UNRELEASED diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4612d066ee3..19488b2e81e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -348,6 +348,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED, EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_COMPLETED, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) // Transitions from KILLED state diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 7a77a6b2088..92742274a9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; @@ -51,10 +53,14 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -340,7 +346,7 @@ public class TestJobImpl { return isUber; } - private InitTransition getInitTransition() { + private static InitTransition getInitTransition() { InitTransition initTransition = new InitTransition() { @Override protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { @@ -350,4 +356,63 @@ public class TestJobImpl { }; return initTransition; } + + @Test + public void testTransitionsAtFailed() throws IOException { + Configuration conf = new Configuration(); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + OutputCommitter committer = mock(OutputCommitter.class); + doThrow(new IOException("forcefail")) + .when(committer).setupJob(any(JobContext.class)); + InlineDispatcher dispatcher = new InlineDispatcher(); + JobImpl job = new StubbedJob(jobId, Records + .newRecord(ApplicationAttemptId.class), conf, + dispatcher.getEventHandler(), committer, true, null); + + dispatcher.register(JobEventType.class, job); + job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); + Assert.assertEquals(JobState.FAILED, job.getState()); + + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); + Assert.assertEquals(JobState.FAILED, job.getState()); + job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); + Assert.assertEquals(JobState.FAILED, job.getState()); + } + + private static class StubbedJob extends JobImpl { + //override the init transition + private final InitTransition initTransition = getInitTransition(); + StateMachineFactory localFactory + = stateMachineFactory.addTransition(JobStateInternal.NEW, + EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + JobEventType.JOB_INIT, + // This is abusive. + initTransition); + + private final StateMachine + localStateMachine; + + @Override + protected StateMachine getStateMachine() { + return localStateMachine; + } + + public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId, + Configuration conf, EventHandler eventHandler, + OutputCommitter committer, boolean newApiCommitter, String user) { + super(jobId, applicationAttemptId, conf, eventHandler, + null, new JobTokenSecretManager(), new Credentials(), + new SystemClock(), null, MRAppMetrics.create(), committer, + newApiCommitter, user, System.currentTimeMillis(), null, null); + + // This "this leak" is okay because the retained pointer is in an + // instance variable. + localStateMachine = localFactory.make(this); + } + } }