diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index adf4d391d2e..66e37c6f6b4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5775. Remove unnecessary job.setNumReduceTasks in SleepJob.createJob (jhanver chand sharma via devaraj) + MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly + (Eric Payne via jlowe) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index e39fa1da168..efc61851b9b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1040,6 +1040,7 @@ public class MRAppMaster extends CompositeService { // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); + boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); @@ -1048,6 +1049,10 @@ public class MRAppMaster extends CompositeService { // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); + // If job is still not initialized, an error happened during + // initialization. Must complete starting all of the services so failure + // events can be processed. + initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to @@ -1076,8 +1081,14 @@ public class MRAppMaster extends CompositeService { // set job classloader if configured MRApps.setJobClassLoader(getConfig()); - // All components have started, start the job. - startJobs(); + + if (initFailed) { + JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); + jobEventDispatcher.handle(initFailedEvent); + } else { + // All components have started, start the job. + startJobs(); + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index ab4d7f5d11c..ed95161654d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -28,6 +28,7 @@ public enum JobEventType { //Producer:MRAppMaster JOB_INIT, + JOB_INIT_FAILED, JOB_START, //Producer:Task diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 36bfca71834..b101f0eb96d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -250,9 +250,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition (JobStateInternal.NEW, - EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), JobEventType.JOB_INIT, new InitTransition()) + .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, + JobEventType.JOB_INIT_FAILED, + new InitFailedTransition()) .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillNewJobTransition()) @@ -265,7 +268,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // Ignore-able events .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) - + // Transitions from INITED state .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -1374,6 +1377,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.submittedJob(job); job.metrics.preparingJob(job); + + if (job.newApiCommitter) { + job.jobContext = new JobContextImpl(job.conf, + job.oldJobId); + } else { + job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( + job.conf, job.oldJobId); + } + try { setup(job); job.fs = job.getFileSystem(job.conf); @@ -1409,14 +1421,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, checkTaskLimits(); - if (job.newApiCommitter) { - job.jobContext = new JobContextImpl(job.conf, - job.oldJobId); - } else { - job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( - job.conf, job.oldJobId); - } - long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); @@ -1443,15 +1447,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.metrics.endPreparingJob(job); return JobStateInternal.INITED; - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAILED; + // Leave job in the NEW state. The MR AM will detect that the state is + // not INITED and send a JOB_INIT_FAILED event. + return JobStateInternal.NEW; } } @@ -1552,6 +1555,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } // end of InitTransition + private static class InitFailedTransition + implements SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, + org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + private static class SetupCompletedTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 7b0bc276270..6cfc83ea006 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -728,6 +729,35 @@ public class TestJobImpl { commitHandler.stop(); } + static final String EXCEPTIONMSG = "Splits max exceeded"; + @Test + public void testMetaInfoSizeOverMax() throws Exception { + Configuration conf = new Configuration(); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + JobImpl job = + new JobImpl(jobId, ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class), + null, new JobTokenSecretManager(), new Credentials(), null, null, + mrAppMetrics, null, true, null, 0, null, null, null, null); + InitTransition initTransition = new InitTransition() { + @Override + protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { + throw new YarnRuntimeException(EXCEPTIONMSG); + } + }; + JobEvent mockJobEvent = mock(JobEvent.class); + + JobStateInternal jobSI = initTransition.transition(job, mockJobEvent); + Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.", + jobSI.equals(JobStateInternal.NEW)); + Assert.assertTrue("Job diagnostics should contain YarnRuntimeException", + job.getDiagnostics().toString().contains("YarnRuntimeException")); + Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG, + job.getDiagnostics().toString().contains(EXCEPTIONMSG)); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = new SystemClock();