MAPREDUCE-3028. Added job-end notification support. Contributed by Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188377 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-24 21:02:43 +00:00
parent 0920056f04
commit 6288dfa873
9 changed files with 351 additions and 16 deletions

View File

@ -1744,6 +1744,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3233. Fixed a bug in MR Job so as to be able to restart the MAPREDUCE-3233. Fixed a bug in MR Job so as to be able to restart the
application on AM crash. (Mahadev Konar via vinodkv) application on AM crash. (Mahadev Konar via vinodkv)
MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via
acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.mortbay.log.Log;
/**
* <p>This class handles job end notification. Submitters of jobs can choose to
* be notified of the end of a job by supplying a URL to which a connection
* will be established.
* <ul><li> The URL connection is fire and forget by default.</li> <li>
* User can specify number of retry attempts and a time interval at which to
* attempt retries</li><li>
* Cluster administrators can set final parameters to set maximum number of
* tries (0 would disable job end notification) and max time interval</li><li>
* The URL may contain sentinels which will be replaced by jobId and jobStatus
* (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
* </p>
*/
public class JobEndNotifier implements Configurable {
final String JOB_ID = "$jobId";
final String JOB_STATUS = "$jobStatus";
private Configuration conf;
protected String userUrl;
protected int numTries; //Number of tries to attempt notification
protected int waitInterval; //Time to wait between retrying notification
protected URL urlToNotify; //URL to notify read from the config
/**
* Parse the URL that needs to be notified of the end of the job, along
* with the number of retries in case of failure and the amount of time to
* wait between retries
* @param conf the configuration
*/
public void setConf(Configuration conf) {
this.conf = conf;
numTries = Math.min(
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
);
waitInterval = Math.min(
conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
, conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
);
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
}
public Configuration getConf() {
return conf;
}
/**
* Notify the URL just once. Use best effort. Timeout hard coded to 5
* seconds.
*/
protected boolean notifyURLOnce() {
boolean success = false;
try {
Log.info("Job end notification trying " + urlToNotify);
URLConnection conn = urlToNotify.openConnection();
conn.setConnectTimeout(5*1000);
conn.setReadTimeout(5*1000);
conn.setAllowUserInteraction(false);
InputStream is = conn.getInputStream();
conn.getContent();
is.close();
success = true;
Log.info("Job end notification to " + urlToNotify + " succeeded");
} catch(IOException ioe) {
Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
}
return success;
}
/**
* Notify a server of the completion of a submitted job. The server must have
* configured MRConfig.JOB_END_NOTIFICATION_URLS
* @param config JobConf to read parameters from
* @param jobReport JobReport used to read JobId and JobStatus
* @throws InterruptedException
*/
public void notify(JobReport jobReport)
throws InterruptedException {
// Do we need job-end notification?
if (userUrl == null) {
Log.info("Job end notification URL not set, skipping.");
return;
}
//Do string replacements for jobId and jobStatus
if (userUrl.contains(JOB_ID)) {
userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
}
if (userUrl.contains(JOB_STATUS)) {
userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
}
// Create the URL, ensure sanity
try {
urlToNotify = new URL(userUrl);
} catch (MalformedURLException mue) {
Log.warn("Job end notification couldn't parse " + userUrl, mue);
return;
}
// Send notification
boolean success = false;
while (numTries-- > 0 && !success) {
Log.info("Job end notification attempts left " + numTries);
success = notifyURLOnce();
if (!success) {
Thread.sleep(waitInterval);
}
}
if (!success) {
Log.warn("Job end notification failed to notify : " + urlToNotify);
} else {
Log.info("Job end notification succeeded for " + jobReport.getJobId());
}
}
}

View File

@ -386,19 +386,34 @@ public class MRAppMaster extends CompositeService {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
LOG.info("Calling stop for all the services");
try { try {
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop(); stop();
// Send job-end notification
try {
LOG.info("Job end notification started for jobID : "
+ job.getReport().getJobId());
JobEndNotifier notifier = new JobEndNotifier();
notifier.setConf(getConfig());
notifier.notify(job.getReport());
} catch (InterruptedException ie) {
LOG.warn("Job end notification interrupted for jobID : "
+ job.getReport().getJobId(), ie );
}
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Graceful stop failed ", t); LOG.warn("Graceful stop failed ", t);
} }
// Cleanup staging directory
try { try {
cleanupStagingDir(); cleanupStagingDir();
} catch(IOException io) { } catch(IOException io) {
LOG.warn("Failed to delete staging dir"); LOG.warn("Failed to delete staging dir");
} }
//TODO: this is required because rpc server does not shut down
// in spite of calling server.stop().
//Bring the process down by force. //Bring the process down by force.
//Not needed after HADOOP-7140 //Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!"); LOG.info("Exiting MR AppMaster..GoodBye!");

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests job end notification
*
*/
public class TestJobEndNotifier extends JobEndNotifier {
//Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
private void testNumRetries(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "0");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "10");
setConf(conf);
Assert.assertTrue("Expected numTries to be 0, but was " + numTries,
numTries == 0 );
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "1");
setConf(conf);
Assert.assertTrue("Expected numTries to be 1, but was " + numTries,
numTries == 1 );
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "20");
setConf(conf);
Assert.assertTrue("Expected numTries to be 11, but was " + numTries,
numTries == 11 ); //11 because number of _retries_ is 10
}
//Test maximum retry interval is capped by
//MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL
private void testWaitInterval(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5");
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval,
waitInterval == 1);
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
//Test negative numbers are set to default
conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10");
setConf(conf);
Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
waitInterval == 5);
}
/**
* Test that setting parameters has the desired effect
*/
@Test
public void checkConfiguration() {
Configuration conf = new Configuration();
testNumRetries(conf);
testWaitInterval(conf);
}
protected int notificationCount = 0;
@Override
protected boolean notifyURLOnce() {
boolean success = super.notifyURLOnce();
notificationCount++;
return success;
}
//Check retries happen as intended
@Test
public void testNotifyRetries() throws InterruptedException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
JobReport jobReport = Mockito.mock(JobReport.class);
this.notificationCount = 0;
this.setConf(conf);
this.notify(jobReport);
Assert.assertEquals("Only 3 retries were expected but was : "
+ this.notificationCount, this.notificationCount, 3);
}
}

View File

@ -1649,7 +1649,7 @@ public class JobConf extends Configuration {
* @see #setJobEndNotificationURI(String) * @see #setJobEndNotificationURI(String)
*/ */
public String getJobEndNotificationURI() { public String getJobEndNotificationURI() {
return get(JobContext.END_NOTIFICATION_URL); return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
} }
/** /**
@ -1669,7 +1669,7 @@ public class JobConf extends Configuration {
* JobCompletionAndChaining">Job Completion and Chaining</a> * JobCompletionAndChaining">Job Completion and Chaining</a>
*/ */
public void setJobEndNotificationURI(String uri) { public void setJobEndNotificationURI(String uri) {
set(JobContext.END_NOTIFICATION_URL, uri); set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
} }
/** /**

View File

@ -102,8 +102,8 @@ public class JobEndNotifier {
String uri = conf.getJobEndNotificationURI(); String uri = conf.getJobEndNotificationURI();
if (uri != null) { if (uri != null) {
// +1 to make logic for first notification identical to a retry // +1 to make logic for first notification identical to a retry
int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1; int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1;
long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000); long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
if (uri.contains("$jobId")) { if (uri.contains("$jobId")) {
uri = uri.replace("$jobId", status.getJobID().toString()); uri = uri.replace("$jobId", status.getJobID().toString());
} }

View File

@ -91,12 +91,6 @@ public interface MRJobConfig {
public static final String WORKING_DIR = "mapreduce.job.working.dir"; public static final String WORKING_DIR = "mapreduce.job.working.dir";
public static final String END_NOTIFICATION_URL = "mapreduce.job.end-notification.url";
public static final String END_NOTIFICATION_RETRIES = "mapreduce.job.end-notification.retry.attempts";
public static final String END_NOTIFICATION_RETRIE_INTERVAL = "mapreduce.job.end-notification.retry.interval";
public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives"; public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives";
public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files"; public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
@ -486,4 +480,23 @@ public interface MRJobConfig {
public static final String APPLICATION_ATTEMPT_ID = public static final String APPLICATION_ATTEMPT_ID =
"mapreduce.job.application.attempt.id"; "mapreduce.job.application.attempt.id";
/**
* Job end notification.
*/
public static final String MR_JOB_END_NOTIFICATION_URL =
"mapreduce.job.end-notification.url";
public static final String MR_JOB_END_RETRY_ATTEMPTS =
"mapreduce.job.end-notification.retry.attempts";
public static final String MR_JOB_END_RETRY_INTERVAL =
"mapreduce.job.end-notification.retry.interval";
public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
"mapreduce.job.end-notification.max.attempts";
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
"mapreduce.job.end-notification.max.retry.interval";
} }

View File

@ -177,11 +177,11 @@ public class ConfigUtil {
Configuration.addDeprecation("tasktracker.contention.tracking", Configuration.addDeprecation("tasktracker.contention.tracking",
new String[] {TTConfig.TT_CONTENTION_TRACKING}); new String[] {TTConfig.TT_CONTENTION_TRACKING});
Configuration.addDeprecation("job.end.notification.url", Configuration.addDeprecation("job.end.notification.url",
new String[] {MRJobConfig.END_NOTIFICATION_URL}); new String[] {MRJobConfig.MR_JOB_END_NOTIFICATION_URL});
Configuration.addDeprecation("job.end.retry.attempts", Configuration.addDeprecation("job.end.retry.attempts",
new String[] {MRJobConfig.END_NOTIFICATION_RETRIES}); new String[] {MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS});
Configuration.addDeprecation("job.end.retry.interval", Configuration.addDeprecation("job.end.retry.interval",
new String[] {MRJobConfig.END_NOTIFICATION_RETRIE_INTERVAL}); new String[] {MRJobConfig.MR_JOB_END_RETRY_INTERVAL});
Configuration.addDeprecation("mapred.committer.job.setup.cleanup.needed", Configuration.addDeprecation("mapred.committer.job.setup.cleanup.needed",
new String[] {MRJobConfig.SETUP_CLEANUP_NEEDED}); new String[] {MRJobConfig.SETUP_CLEANUP_NEEDED});
Configuration.addDeprecation("mapred.jar", Configuration.addDeprecation("mapred.jar",

View File

@ -1179,4 +1179,49 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.job.end-notification.max.attempts</name>
<value>5</value>
<final>true</final>
<description>The maximum number of times a URL will be read for providing job
end notification. Cluster administrators can set this to limit how long
after end of a job, the Application Master waits before exiting. Must be
marked as final to prevent users from overriding this.
</description>
</property>
<property>
<name>mapreduce.job.end-notification.max.retry.interval</name>
<value>5</value>
<final>true</final>
<description>The maximum amount of time (in seconds) to wait before retrying
job end notification. Cluster administrators can set this to limit how long
the Application Master waits before exiting. Must be marked as final to
prevent users from overriding this.</description>
</property>
<property>
<name>mapreduce.job.end-notification.url</name>
<value></value>
<description>The URL to send job end notification. It may contain sentinels
$jobId and $jobStatus which will be replaced with jobId and jobStatus.
</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.attempts</name>
<value>5</value>
<description>The number of times the submitter of the job wants to retry job
end notification if it fails. This is capped by
mapreduce.job.end-notification.max.attempts</description>
</property>
<property>
<name>mapreduce.job.end-notification.retry.interval</name>
<value>1</value>
<description>The number of seconds the submitter of the job wants to wait
before job end notification is retried if it fails. This is capped by
mapreduce.job.end-notification.max.retry.interval</description>
</property>
</configuration> </configuration>