MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end notification URL. Contributed by Ravi Prakash.

svn merge --ignore-ancestry -c 1229736 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229737 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-10 21:09:41 +00:00
parent 10c1a416fc
commit 6821efa3ae
4 changed files with 70 additions and 1 deletions

View File

@ -102,6 +102,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3312. Modified MR AM to not send a stop-container request for
a container that isn't launched at all. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end
notification URL. (Ravi Prakash via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.Proxy;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@ -49,9 +51,11 @@ public class JobEndNotifier implements Configurable {
private Configuration conf;
protected String userUrl;
protected String proxyConf;
protected int numTries; //Number of tries to attempt notification
protected int waitInterval; //Time to wait between retrying notification
protected URL urlToNotify; //URL to notify read from the config
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
/**
* Parse the URL that needs to be notified of the end of the job, along
@ -73,6 +77,34 @@ public class JobEndNotifier implements Configurable {
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
//Configure the proxy to use if its set. It should be set like
//proxyType@proxyHostname:port
if(proxyConf != null && !proxyConf.equals("") &&
proxyConf.lastIndexOf(":") != -1) {
int typeIndex = proxyConf.indexOf("@");
Proxy.Type proxyType = Proxy.Type.HTTP;
if(typeIndex != -1 &&
proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
proxyType = Proxy.Type.SOCKS;
}
String hostname = proxyConf.substring(typeIndex + 1,
proxyConf.lastIndexOf(":"));
String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
try {
int port = Integer.parseInt(portConf);
proxyToUse = new Proxy(proxyType,
new InetSocketAddress(hostname, port));
Log.info("Job end notification using proxy type \"" + proxyType +
"\" hostname \"" + hostname + "\" and port \"" + port + "\"");
} catch(NumberFormatException nfe) {
Log.warn("Job end notification couldn't parse configured proxy's port "
+ portConf + ". Not going to use a proxy");
}
}
}
public Configuration getConf() {
@ -87,7 +119,7 @@ public class JobEndNotifier implements Configurable {
boolean success = false;
try {
Log.info("Job end notification trying " + urlToNotify);
URLConnection conn = urlToNotify.openConnection();
URLConnection conn = urlToNotify.openConnection(proxyToUse);
conn.setConnectTimeout(5*1000);
conn.setReadTimeout(5*1000);
conn.setAllowUserInteraction(false);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.net.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@ -71,6 +73,34 @@ public class TestJobEndNotifier extends JobEndNotifier {
waitInterval == 5);
}
private void testProxyConfiguration(Configuration conf) {
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
setConf(conf);
Assert.assertTrue("Proxy shouldn't be set because port wasn't specified",
proxyToUse.type() == Proxy.Type.DIRECT);
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:someport");
setConf(conf);
Assert.assertTrue("Proxy shouldn't be set because port wasn't numeric",
proxyToUse.type() == Proxy.Type.DIRECT);
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been set but wasn't ",
proxyToUse.toString().equals("HTTP @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "socks@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been socks but wasn't ",
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "SOCKS@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been socks but wasn't ",
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "sfafn@somehost:1000");
setConf(conf);
Assert.assertTrue("Proxy should have been http but wasn't ",
proxyToUse.toString().equals("HTTP @ somehost:1000"));
}
/**
* Test that setting parameters has the desired effect
*/
@ -79,6 +109,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
Configuration conf = new Configuration();
testNumRetries(conf);
testWaitInterval(conf);
testProxyConfiguration(conf);
}
protected int notificationCount = 0;

View File

@ -501,6 +501,9 @@ public interface MRJobConfig {
public static final String MR_JOB_END_NOTIFICATION_URL =
"mapreduce.job.end-notification.url";
public static final String MR_JOB_END_NOTIFICATION_PROXY =
"mapreduce.job.end-notification.proxy";
public static final String MR_JOB_END_RETRY_ATTEMPTS =
"mapreduce.job.end-notification.retry.attempts";