From 6ed54f3439ea9c7af6bf129ebe1938380febb5e2 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 13 Jun 2017 15:21:04 -0700 Subject: [PATCH] MAPREDUCE-6895. Job end notification not send due to YarnRuntimeException. Contributed by yunjiong zhao. --- .../mapreduce/v2/app/JobEndNotifier.java | 5 -- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 51 +++++++++++-------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index 836fad5264c..3bf05420f0c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -153,11 +153,6 @@ public class JobEndNotifier implements Configurable { */ public void notify(JobReport jobReport) throws InterruptedException { - // Do we need job-end notification? - if (userUrl == null) { - Log.getLog().info("Job end notification URL not set, skipping."); - return; - } //Do string replacements for jobId and jobStatus if (userUrl.contains(JOB_ID)) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index ba7f9291347..1445481f705 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -646,6 +646,12 @@ public class MRAppMaster extends CompositeService { // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) + JobEndNotifier notifier = null; + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { + notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + } + try { //if isLastAMRetry comes as true, should never set it to false if ( !isLastAMRetry){ @@ -660,28 +666,11 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); MRAppMaster.this.stop(); - if (isLastAMRetry) { + if (isLastAMRetry && notifier != null) { // Send job-end notification when it is safe to report termination to // users and it is the last AM retry - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - JobReport report = job.getReport(); - // If unregistration fails, the final state is unavailable. However, - // at the last AM Retry, the client will finally be notified FAILED - // from RM, so we should let users know FAILED via notifier as well - if (!context.hasSuccessfullyUnregistered()) { - report.setJobState(JobState.FAILED); - } - notifier.notify(report); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); - } - } + sendJobEndNotify(notifier); + notifier = null; } try { @@ -693,10 +682,32 @@ public class MRAppMaster extends CompositeService { } catch (Throwable t) { LOG.warn("Graceful stop failed. Exiting.. ", t); exitMRAppMaster(1, t); + } finally { + if (isLastAMRetry && notifier != null) { + sendJobEndNotify(notifier); + } } exitMRAppMaster(0, null); } + private void sendJobEndNotify(JobEndNotifier notifier) { + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + // If unregistration fails, the final state is unavailable. However, + // at the last AM Retry, the client will finally be notified FAILED + // from RM, so we should let users know FAILED via notifier as well + JobReport report = job.getReport(); + if (!context.hasSuccessfullyUnregistered()) { + report.setJobState(JobState.FAILED); + } + notifier.notify(report); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); + } + } + /** MRAppMaster exit method which has been instrumented for both runtime and * unit testing. * If the main thread has not been started, this method was called from a