From 6821efa3ae4b452e580f88e775fc1afbc639cdd8 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 10 Jan 2012 21:09:41 +0000 Subject: [PATCH] MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end notification URL. Contributed by Ravi Prakash. svn merge --ignore-ancestry -c 1229736 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229737 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../mapreduce/v2/app/JobEndNotifier.java | 34 ++++++++++++++++++- .../mapreduce/v2/app/TestJobEndNotifier.java | 31 +++++++++++++++++ .../apache/hadoop/mapreduce/MRJobConfig.java | 3 ++ 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7aac394cab7..6a76177bdc9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -102,6 +102,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3312. Modified MR AM to not send a stop-container request for a container that isn't launched at all. (Robert Joseph Evans via vinodkv) + MAPREDUCE-3382. Enhanced MR AM to use a proxy to ping the job-end + notification URL. (Ravi Prakash via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index f22b9b7ba02..ae92cc0a3ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -20,9 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; +import java.net.Proxy; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -49,9 +51,11 @@ public class JobEndNotifier implements Configurable { private Configuration conf; protected String userUrl; + protected String proxyConf; protected int numTries; //Number of tries to attempt notification protected int waitInterval; //Time to wait between retrying notification protected URL urlToNotify; //URL to notify read from the config + protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification /** * Parse the URL that needs to be notified of the end of the job, along @@ -73,6 +77,34 @@ public class JobEndNotifier implements Configurable { waitInterval = (waitInterval < 0) ? 5 : waitInterval; userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL); + + proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY); + + //Configure the proxy to use if its set. It should be set like + //proxyType@proxyHostname:port + if(proxyConf != null && !proxyConf.equals("") && + proxyConf.lastIndexOf(":") != -1) { + int typeIndex = proxyConf.indexOf("@"); + Proxy.Type proxyType = Proxy.Type.HTTP; + if(typeIndex != -1 && + proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) { + proxyType = Proxy.Type.SOCKS; + } + String hostname = proxyConf.substring(typeIndex + 1, + proxyConf.lastIndexOf(":")); + String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1); + try { + int port = Integer.parseInt(portConf); + proxyToUse = new Proxy(proxyType, + new InetSocketAddress(hostname, port)); + Log.info("Job end notification using proxy type \"" + proxyType + + "\" hostname \"" + hostname + "\" and port \"" + port + "\""); + } catch(NumberFormatException nfe) { + Log.warn("Job end notification couldn't parse configured proxy's port " + + portConf + ". Not going to use a proxy"); + } + } + } public Configuration getConf() { @@ -87,7 +119,7 @@ public class JobEndNotifier implements Configurable { boolean success = false; try { Log.info("Job end notification trying " + urlToNotify); - URLConnection conn = urlToNotify.openConnection(); + URLConnection conn = urlToNotify.openConnection(proxyToUse); conn.setConnectTimeout(5*1000); conn.setReadTimeout(5*1000); conn.setAllowUserInteraction(false); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 52ca1cf3f8f..6d92d0de730 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.net.Proxy; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; @@ -71,6 +73,34 @@ public class TestJobEndNotifier extends JobEndNotifier { waitInterval == 5); } + private void testProxyConfiguration(Configuration conf) { + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost"); + setConf(conf); + Assert.assertTrue("Proxy shouldn't be set because port wasn't specified", + proxyToUse.type() == Proxy.Type.DIRECT); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:someport"); + setConf(conf); + Assert.assertTrue("Proxy shouldn't be set because port wasn't numeric", + proxyToUse.type() == Proxy.Type.DIRECT); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:1000"); + setConf(conf); + Assert.assertTrue("Proxy should have been set but wasn't ", + proxyToUse.toString().equals("HTTP @ somehost:1000")); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "socks@somehost:1000"); + setConf(conf); + Assert.assertTrue("Proxy should have been socks but wasn't ", + proxyToUse.toString().equals("SOCKS @ somehost:1000")); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "SOCKS@somehost:1000"); + setConf(conf); + Assert.assertTrue("Proxy should have been socks but wasn't ", + proxyToUse.toString().equals("SOCKS @ somehost:1000")); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "sfafn@somehost:1000"); + setConf(conf); + Assert.assertTrue("Proxy should have been http but wasn't ", + proxyToUse.toString().equals("HTTP @ somehost:1000")); + + } + /** * Test that setting parameters has the desired effect */ @@ -79,6 +109,7 @@ public class TestJobEndNotifier extends JobEndNotifier { Configuration conf = new Configuration(); testNumRetries(conf); testWaitInterval(conf); + testProxyConfiguration(conf); } protected int notificationCount = 0; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index f0138a17396..538c8998a85 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -501,6 +501,9 @@ public interface MRJobConfig { public static final String MR_JOB_END_NOTIFICATION_URL = "mapreduce.job.end-notification.url"; + public static final String MR_JOB_END_NOTIFICATION_PROXY = + "mapreduce.job.end-notification.proxy"; + public static final String MR_JOB_END_RETRY_ATTEMPTS = "mapreduce.job.end-notification.retry.attempts";