MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end notification URL. Contributed by Ravi Prakash.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229736 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
469f9e2db1
commit
44e0bb831b
|
@ -168,6 +168,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3312. Modified MR AM to not send a stop-container request for
|
MAPREDUCE-3312. Modified MR AM to not send a stop-container request for
|
||||||
a container that isn't launched at all. (Robert Joseph Evans via vinodkv)
|
a container that isn't launched at all. (Robert Joseph Evans via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end
|
||||||
|
notification URL. (Ravi Prakash via vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
|
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
|
||||||
|
|
|
@ -20,9 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
|
import java.net.Proxy;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -49,9 +51,11 @@ public class JobEndNotifier implements Configurable {
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
protected String userUrl;
|
protected String userUrl;
|
||||||
|
protected String proxyConf;
|
||||||
protected int numTries; //Number of tries to attempt notification
|
protected int numTries; //Number of tries to attempt notification
|
||||||
protected int waitInterval; //Time to wait between retrying notification
|
protected int waitInterval; //Time to wait between retrying notification
|
||||||
protected URL urlToNotify; //URL to notify read from the config
|
protected URL urlToNotify; //URL to notify read from the config
|
||||||
|
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the URL that needs to be notified of the end of the job, along
|
* Parse the URL that needs to be notified of the end of the job, along
|
||||||
|
@ -73,6 +77,34 @@ public class JobEndNotifier implements Configurable {
|
||||||
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
|
waitInterval = (waitInterval < 0) ? 5 : waitInterval;
|
||||||
|
|
||||||
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
|
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
|
||||||
|
|
||||||
|
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
||||||
|
|
||||||
|
//Configure the proxy to use if its set. It should be set like
|
||||||
|
//proxyType@proxyHostname:port
|
||||||
|
if(proxyConf != null && !proxyConf.equals("") &&
|
||||||
|
proxyConf.lastIndexOf(":") != -1) {
|
||||||
|
int typeIndex = proxyConf.indexOf("@");
|
||||||
|
Proxy.Type proxyType = Proxy.Type.HTTP;
|
||||||
|
if(typeIndex != -1 &&
|
||||||
|
proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
|
||||||
|
proxyType = Proxy.Type.SOCKS;
|
||||||
|
}
|
||||||
|
String hostname = proxyConf.substring(typeIndex + 1,
|
||||||
|
proxyConf.lastIndexOf(":"));
|
||||||
|
String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
|
||||||
|
try {
|
||||||
|
int port = Integer.parseInt(portConf);
|
||||||
|
proxyToUse = new Proxy(proxyType,
|
||||||
|
new InetSocketAddress(hostname, port));
|
||||||
|
Log.info("Job end notification using proxy type \"" + proxyType +
|
||||||
|
"\" hostname \"" + hostname + "\" and port \"" + port + "\"");
|
||||||
|
} catch(NumberFormatException nfe) {
|
||||||
|
Log.warn("Job end notification couldn't parse configured proxy's port "
|
||||||
|
+ portConf + ". Not going to use a proxy");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
|
@ -87,7 +119,7 @@ public class JobEndNotifier implements Configurable {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
Log.info("Job end notification trying " + urlToNotify);
|
Log.info("Job end notification trying " + urlToNotify);
|
||||||
URLConnection conn = urlToNotify.openConnection();
|
URLConnection conn = urlToNotify.openConnection(proxyToUse);
|
||||||
conn.setConnectTimeout(5*1000);
|
conn.setConnectTimeout(5*1000);
|
||||||
conn.setReadTimeout(5*1000);
|
conn.setReadTimeout(5*1000);
|
||||||
conn.setAllowUserInteraction(false);
|
conn.setAllowUserInteraction(false);
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import java.net.Proxy;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
|
@ -71,6 +73,34 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
waitInterval == 5);
|
waitInterval == 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testProxyConfiguration(Configuration conf) {
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy shouldn't be set because port wasn't specified",
|
||||||
|
proxyToUse.type() == Proxy.Type.DIRECT);
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:someport");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy shouldn't be set because port wasn't numeric",
|
||||||
|
proxyToUse.type() == Proxy.Type.DIRECT);
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:1000");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy should have been set but wasn't ",
|
||||||
|
proxyToUse.toString().equals("HTTP @ somehost:1000"));
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "socks@somehost:1000");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy should have been socks but wasn't ",
|
||||||
|
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "SOCKS@somehost:1000");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy should have been socks but wasn't ",
|
||||||
|
proxyToUse.toString().equals("SOCKS @ somehost:1000"));
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "sfafn@somehost:1000");
|
||||||
|
setConf(conf);
|
||||||
|
Assert.assertTrue("Proxy should have been http but wasn't ",
|
||||||
|
proxyToUse.toString().equals("HTTP @ somehost:1000"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that setting parameters has the desired effect
|
* Test that setting parameters has the desired effect
|
||||||
*/
|
*/
|
||||||
|
@ -79,6 +109,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
testNumRetries(conf);
|
testNumRetries(conf);
|
||||||
testWaitInterval(conf);
|
testWaitInterval(conf);
|
||||||
|
testProxyConfiguration(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int notificationCount = 0;
|
protected int notificationCount = 0;
|
||||||
|
|
|
@ -501,6 +501,9 @@ public interface MRJobConfig {
|
||||||
public static final String MR_JOB_END_NOTIFICATION_URL =
|
public static final String MR_JOB_END_NOTIFICATION_URL =
|
||||||
"mapreduce.job.end-notification.url";
|
"mapreduce.job.end-notification.url";
|
||||||
|
|
||||||
|
public static final String MR_JOB_END_NOTIFICATION_PROXY =
|
||||||
|
"mapreduce.job.end-notification.proxy";
|
||||||
|
|
||||||
public static final String MR_JOB_END_RETRY_ATTEMPTS =
|
public static final String MR_JOB_END_RETRY_ATTEMPTS =
|
||||||
"mapreduce.job.end-notification.retry.attempts";
|
"mapreduce.job.end-notification.retry.attempts";
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue