From 28d52eabfa2973110c621b0c615ee2efc3ac2c1a Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 24 Oct 2011 21:03:22 +0000 Subject: [PATCH] Merge -c 1188377 from trunk to branch-0.23 to complete fix for MAPREDUCE-3028. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188378 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/JobEndNotifier.java | 151 ++++++++++++++++++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 21 ++- .../mapreduce/v2/app/TestJobEndNotifier.java | 108 +++++++++++++ .../org/apache/hadoop/mapred/JobConf.java | 4 +- .../apache/hadoop/mapred/JobEndNotifier.java | 4 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 25 ++- .../hadoop/mapreduce/util/ConfigUtil.java | 6 +- .../src/main/resources/mapred-default.xml | 45 ++++++ 9 files changed, 351 insertions(+), 16 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f8b2db1ce0a..203fee0f180 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1688,6 +1688,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3233. Fixed a bug in MR Job so as to be able to restart the application on AM crash. (Mahadev Konar via vinodkv) + MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via + acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java new file mode 100644 index 00000000000..64125fde2ae --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -0,0 +1,151 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.mortbay.log.Log; + +/** + *

This class handles job end notification. Submitters of jobs can choose to + * be notified of the end of a job by supplying a URL to which a connection + * will be established. + *

+ *

+ */ +public class JobEndNotifier implements Configurable { + final String JOB_ID = "$jobId"; + final String JOB_STATUS = "$jobStatus"; + + private Configuration conf; + protected String userUrl; + protected int numTries; //Number of tries to attempt notification + protected int waitInterval; //Time to wait between retrying notification + protected URL urlToNotify; //URL to notify read from the config + + /** + * Parse the URL that needs to be notified of the end of the job, along + * with the number of retries in case of failure and the amount of time to + * wait between retries + * @param conf the configuration + */ + public void setConf(Configuration conf) { + this.conf = conf; + + numTries = Math.min( + conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1 + , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1) + ); + waitInterval = Math.min( + conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5) + , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5) + ); + waitInterval = (waitInterval < 0) ? 5 : waitInterval; + + userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL); + } + + public Configuration getConf() { + return conf; + } + + /** + * Notify the URL just once. Use best effort. Timeout hard coded to 5 + * seconds. + */ + protected boolean notifyURLOnce() { + boolean success = false; + try { + Log.info("Job end notification trying " + urlToNotify); + URLConnection conn = urlToNotify.openConnection(); + conn.setConnectTimeout(5*1000); + conn.setReadTimeout(5*1000); + conn.setAllowUserInteraction(false); + InputStream is = conn.getInputStream(); + conn.getContent(); + is.close(); + success = true; + Log.info("Job end notification to " + urlToNotify + " succeeded"); + } catch(IOException ioe) { + Log.warn("Job end notification to " + urlToNotify + " failed", ioe); + } + return success; + } + + /** + * Notify a server of the completion of a submitted job. The server must have + * configured MRConfig.JOB_END_NOTIFICATION_URLS + * @param config JobConf to read parameters from + * @param jobReport JobReport used to read JobId and JobStatus + * @throws InterruptedException + */ + public void notify(JobReport jobReport) + throws InterruptedException { + // Do we need job-end notification? + if (userUrl == null) { + Log.info("Job end notification URL not set, skipping."); + return; + } + + //Do string replacements for jobId and jobStatus + if (userUrl.contains(JOB_ID)) { + userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString()); + } + if (userUrl.contains(JOB_STATUS)) { + userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString()); + } + + // Create the URL, ensure sanity + try { + urlToNotify = new URL(userUrl); + } catch (MalformedURLException mue) { + Log.warn("Job end notification couldn't parse " + userUrl, mue); + return; + } + + // Send notification + boolean success = false; + while (numTries-- > 0 && !success) { + Log.info("Job end notification attempts left " + numTries); + success = notifyURLOnce(); + if (!success) { + Thread.sleep(waitInterval); + } + } + if (!success) { + Log.warn("Job end notification failed to notify : " + urlToNotify); + } else { + Log.info("Job end notification succeeded for " + jobReport.getJobId()); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index b36e55d68a7..85a44ac01ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -386,19 +386,34 @@ public class MRAppMaster extends CompositeService { } catch (InterruptedException e) { e.printStackTrace(); } - LOG.info("Calling stop for all the services"); try { + // Stop all services + // This will also send the final report to the ResourceManager + LOG.info("Calling stop for all the services"); stop(); + + // Send job-end notification + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie ); + } } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } + + // Cleanup staging directory try { cleanupStagingDir(); } catch(IOException io) { LOG.warn("Failed to delete staging dir"); } - //TODO: this is required because rpc server does not shut down - // in spite of calling server.stop(). + //Bring the process down by force. //Not needed after HADOOP-7140 LOG.info("Exiting MR AppMaster..GoodBye!"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java new file mode 100644 index 00000000000..46cb11e9241 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -0,0 +1,108 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Tests job end notification + * + */ +public class TestJobEndNotifier extends JobEndNotifier { + + //Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS + private void testNumRetries(Configuration conf) { + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "0"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "10"); + setConf(conf); + Assert.assertTrue("Expected numTries to be 0, but was " + numTries, + numTries == 0 ); + + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "1"); + setConf(conf); + Assert.assertTrue("Expected numTries to be 1, but was " + numTries, + numTries == 1 ); + + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "20"); + setConf(conf); + Assert.assertTrue("Expected numTries to be 11, but was " + numTries, + numTries == 11 ); //11 because number of _retries_ is 10 + } + + //Test maximum retry interval is capped by + //MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL + private void testWaitInterval(Configuration conf) { + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1"); + setConf(conf); + Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval, + waitInterval == 1); + + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10"); + setConf(conf); + Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, + waitInterval == 5); + + //Test negative numbers are set to default + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10"); + setConf(conf); + Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, + waitInterval == 5); + } + + /** + * Test that setting parameters has the desired effect + */ + @Test + public void checkConfiguration() { + Configuration conf = new Configuration(); + testNumRetries(conf); + testWaitInterval(conf); + } + + protected int notificationCount = 0; + @Override + protected boolean notifyURLOnce() { + boolean success = super.notifyURLOnce(); + notificationCount++; + return success; + } + + //Check retries happen as intended + @Test + public void testNotifyRetries() throws InterruptedException { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3"); + JobReport jobReport = Mockito.mock(JobReport.class); + + this.notificationCount = 0; + this.setConf(conf); + this.notify(jobReport); + Assert.assertEquals("Only 3 retries were expected but was : " + + this.notificationCount, this.notificationCount, 3); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index b489d41b17c..9475681d6d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -1649,7 +1649,7 @@ public class JobConf extends Configuration { * @see #setJobEndNotificationURI(String) */ public String getJobEndNotificationURI() { - return get(JobContext.END_NOTIFICATION_URL); + return get(JobContext.MR_JOB_END_NOTIFICATION_URL); } /** @@ -1669,7 +1669,7 @@ public class JobConf extends Configuration { * JobCompletionAndChaining">Job Completion and Chaining */ public void setJobEndNotificationURI(String uri) { - set(JobContext.END_NOTIFICATION_URL, uri); + set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java index d28e72290fd..c5fd9ca2289 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java @@ -102,8 +102,8 @@ public class JobEndNotifier { String uri = conf.getJobEndNotificationURI(); if (uri != null) { // +1 to make logic for first notification identical to a retry - int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1; - long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000); + int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1; + long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000); if (uri.contains("$jobId")) { uri = uri.replace("$jobId", status.getJobID().toString()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index bfa31df582a..142807f2271 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -91,12 +91,6 @@ public interface MRJobConfig { public static final String WORKING_DIR = "mapreduce.job.working.dir"; - public static final String END_NOTIFICATION_URL = "mapreduce.job.end-notification.url"; - - public static final String END_NOTIFICATION_RETRIES = "mapreduce.job.end-notification.retry.attempts"; - - public static final String END_NOTIFICATION_RETRIE_INTERVAL = "mapreduce.job.end-notification.retry.interval"; - public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives"; public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files"; @@ -486,4 +480,23 @@ public interface MRJobConfig { public static final String APPLICATION_ATTEMPT_ID = "mapreduce.job.application.attempt.id"; + + /** + * Job end notification. + */ + public static final String MR_JOB_END_NOTIFICATION_URL = + "mapreduce.job.end-notification.url"; + + public static final String MR_JOB_END_RETRY_ATTEMPTS = + "mapreduce.job.end-notification.retry.attempts"; + + public static final String MR_JOB_END_RETRY_INTERVAL = + "mapreduce.job.end-notification.retry.interval"; + + public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS = + "mapreduce.job.end-notification.max.attempts"; + + public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL = + "mapreduce.job.end-notification.max.retry.interval"; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java index e4cef59ef40..51ecc34b2c1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java @@ -177,11 +177,11 @@ public class ConfigUtil { Configuration.addDeprecation("tasktracker.contention.tracking", new String[] {TTConfig.TT_CONTENTION_TRACKING}); Configuration.addDeprecation("job.end.notification.url", - new String[] {MRJobConfig.END_NOTIFICATION_URL}); + new String[] {MRJobConfig.MR_JOB_END_NOTIFICATION_URL}); Configuration.addDeprecation("job.end.retry.attempts", - new String[] {MRJobConfig.END_NOTIFICATION_RETRIES}); + new String[] {MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS}); Configuration.addDeprecation("job.end.retry.interval", - new String[] {MRJobConfig.END_NOTIFICATION_RETRIE_INTERVAL}); + new String[] {MRJobConfig.MR_JOB_END_RETRY_INTERVAL}); Configuration.addDeprecation("mapred.committer.job.setup.cleanup.needed", new String[] {MRJobConfig.SETUP_CLEANUP_NEEDED}); Configuration.addDeprecation("mapred.jar", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 496de9b44bb..d562bdf0f46 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1179,4 +1179,49 @@ + + mapreduce.job.end-notification.max.attempts + 5 + true + The maximum number of times a URL will be read for providing job + end notification. Cluster administrators can set this to limit how long + after end of a job, the Application Master waits before exiting. Must be + marked as final to prevent users from overriding this. + + + + + mapreduce.job.end-notification.max.retry.interval + 5 + true + The maximum amount of time (in seconds) to wait before retrying + job end notification. Cluster administrators can set this to limit how long + the Application Master waits before exiting. Must be marked as final to + prevent users from overriding this. + + + + mapreduce.job.end-notification.url + + The URL to send job end notification. It may contain sentinels + $jobId and $jobStatus which will be replaced with jobId and jobStatus. + + + + + mapreduce.job.end-notification.retry.attempts + 5 + The number of times the submitter of the job wants to retry job + end notification if it fails. This is capped by + mapreduce.job.end-notification.max.attempts + + + + mapreduce.job.end-notification.retry.interval + 1 + The number of seconds the submitter of the job wants to wait + before job end notification is retried if it fails. This is capped by + mapreduce.job.end-notification.max.retry.interval + +