MAPREDUCE-3484. Fixed JobEndNotifier to not get interrupted before completing all its retries. Contributed by Ravi Prakash.

svn merge -c 1214563 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1214566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-12-14 23:59:54 +00:00
parent 5aa03f2ef5
commit 87b8d8612c
3 changed files with 20 additions and 10 deletions

View File

@ -251,6 +251,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the
ResourceManager. (Arun C Murthy via vinodkv) ResourceManager. (Arun C Murthy via vinodkv)
MAPREDUCE-3484. Fixed JobEndNotifier to not get interrupted before completing
all its retries. (Ravi Prakash via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -375,6 +375,16 @@ public class MRAppMaster extends CompositeService {
// this is the only job, so shut down the Appmaster // this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new // note in a workflow scenario, this may lead to creation of a new
// job (FIXME?) // job (FIXME?)
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie );
}
// TODO:currently just wait for some time so clients can know the // TODO:currently just wait for some time so clients can know the
// final states. Will be removed once RM come on. // final states. Will be removed once RM come on.
@ -390,16 +400,6 @@ public class MRAppMaster extends CompositeService {
stop(); stop();
// Send job-end notification // Send job-end notification
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie );
}
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Graceful stop failed ", t); LOG.warn("Graceful stop failed ", t);
} }

View File

@ -96,13 +96,20 @@ public class TestJobEndNotifier extends JobEndNotifier {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3"); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3"); conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3");
JobReport jobReport = Mockito.mock(JobReport.class); JobReport jobReport = Mockito.mock(JobReport.class);
long startTime = System.currentTimeMillis();
this.notificationCount = 0; this.notificationCount = 0;
this.setConf(conf); this.setConf(conf);
this.notify(jobReport); this.notify(jobReport);
long endTime = System.currentTimeMillis();
Assert.assertEquals("Only 3 retries were expected but was : " Assert.assertEquals("Only 3 retries were expected but was : "
+ this.notificationCount, this.notificationCount, 3); + this.notificationCount, this.notificationCount, 3);
Assert.assertTrue("Should have taken more than 9 seconds it took "
+ (endTime - startTime), endTime - startTime > 9000);
} }
} }