diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 23f436f46cd..1191f8d789b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.ConverterUtils; +import com.google.common.annotations.VisibleForTesting; + /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -398,52 +400,65 @@ public class MRAppMaster extends CompositeService { protected void sysexit() { System.exit(0); } - + + @VisibleForTesting + public void shutDownJob() { + // job has finished + // this is the only job, so shut down the Appmaster + // note in a workflow scenario, this may lead to creation of a new + // job (FIXME?) + // Send job-end notification + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); + } + } + + // TODO:currently just wait for some time so clients can know the + // final states. Will be removed once RM come on. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + //We are finishing cleanly so this is the last retry + isLastAMRetry = true; + // Stop all services + // This will also send the final report to the ResourceManager + LOG.info("Calling stop for all the services"); + MRAppMaster.this.stop(); + + } catch (Throwable t) { + LOG.warn("Graceful stop failed ", t); + } + + //Bring the process down by force. + //Not needed after HADOOP-7140 + LOG.info("Exiting MR AppMaster..GoodBye!"); + sysexit(); + } + private class JobFinishEventHandler implements EventHandler { @Override public void handle(JobFinishEvent event) { - // job has finished - // this is the only job, so shut down the Appmaster - // note in a workflow scenario, this may lead to creation of a new - // job (FIXME?) - // Send job-end notification - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - notifier.notify(job.getReport()); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); + // Create a new thread to shutdown the AM. We should not do it in-line + // to avoid blocking the dispatcher itself. + new Thread() { + + @Override + public void run() { + shutDownJob(); } - } - - // TODO:currently just wait for some time so clients can know the - // final states. Will be removed once RM come on. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - try { - //We are finishing cleanly so this is the last retry - isLastAMRetry = true; - // Stop all services - // This will also send the final report to the ResourceManager - LOG.info("Calling stop for all the services"); - stop(); - - } catch (Throwable t) { - LOG.warn("Graceful stop failed ", t); - } - - //Bring the process down by force. - //Not needed after HADOOP-7140 - LOG.info("Exiting MR AppMaster..GoodBye!"); - sysexit(); + }.start(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index 3dd6c33edef..eee29a702d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; import java.io.IOException; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; @@ -68,7 +65,6 @@ import org.junit.Test; private Path stagingJobPath = new Path(stagingJobDir); private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class); @Test public void testDeletionofStaging() throws IOException { @@ -86,9 +82,7 @@ import org.junit.Test; jobid.setAppId(appId); MRAppMaster appMaster = new TestMRApp(attemptId); appMaster.init(conf); - EventHandler handler = - appMaster.createJobFinishEventHandler(); - handler.handle(new JobFinishEvent(jobid)); + appMaster.shutDownJob(); verify(fs).delete(stagingJobPath, true); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cfc8b238135..5ca37e253f4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -182,6 +182,9 @@ Release 0.23.5 - UNRELEASED YARN-180. Capacity scheduler - containers that get reserved create container token to early (acmurthy and bobby) + YARN-139. Interrupted Exception within AsyncDispatcher leads to user + confusion. (Vinod Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index c8f325df244..9377397e489 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -68,7 +68,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { event = eventQueue.take(); } catch(InterruptedException ie) { - LOG.warn("AsyncDispatcher thread interrupted", ie); + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } return; } if (event != null) { @@ -180,7 +182,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { eventQueue.put(event); } catch (InterruptedException e) { - LOG.warn("AsyncDispatcher thread interrupted", e); + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } throw new YarnException(e); } };