diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d8220716cee..ab301132ba0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -168,6 +168,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5163. Update MR App to not use API utility methods for collections after YARN-441. (Xuan Gong via vinodkv) + MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan + Mitic via acmurthy) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index 518305f9589..981e6ffb4b4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.mortbay.log.Log; @@ -54,6 +55,7 @@ public class JobEndNotifier implements Configurable { protected String proxyConf; protected int numTries; //Number of tries to attempt notification protected int waitInterval; //Time (ms) to wait between retrying notification + protected int timeout; // Timeout (ms) on the connection and notification protected URL urlToNotify; //URL to notify read from the config protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification @@ -76,6 +78,9 @@ public void setConf(Configuration conf) { ); waitInterval = (waitInterval < 0) ? 5000 : waitInterval; + timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT, + JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT); + userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL); proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY); @@ -112,8 +117,7 @@ public Configuration getConf() { } /** - * Notify the URL just once. Use best effort. Timeout hard coded to 5 - * seconds. + * Notify the URL just once. Use best effort. */ protected boolean notifyURLOnce() { boolean success = false; @@ -121,8 +125,8 @@ protected boolean notifyURLOnce() { Log.info("Job end notification trying " + urlToNotify); HttpURLConnection conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse); - conn.setConnectTimeout(5*1000); - conn.setReadTimeout(5*1000); + conn.setConnectTimeout(timeout); + conn.setReadTimeout(timeout); conn.setAllowUserInteraction(false); if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) { Log.warn("Job end notification to " + urlToNotify +" failed with code: " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 65acc623c36..9f755713173 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -73,6 +73,13 @@ private void testWaitInterval(Configuration conf) { + waitInterval, waitInterval == 5000); } + private void testTimeout(Configuration conf) { + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_TIMEOUT, "1000"); + setConf(conf); + Assert.assertTrue("Expected timeout to be 1000, but was " + + timeout, timeout == 1000); + } + private void testProxyConfiguration(Configuration conf) { conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost"); setConf(conf); @@ -109,6 +116,7 @@ public void checkConfiguration() { Configuration conf = new Configuration(); testNumRetries(conf); testWaitInterval(conf); + testTimeout(conf); testProxyConfiguration(conf); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java index 5840730e54a..74861fece3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java @@ -44,9 +44,10 @@ private static JobEndStatusInfo createNotification(JobConf conf, JobEndStatusInfo notification = null; String uri = conf.getJobEndNotificationURI(); if (uri != null) { - // +1 to make logic for first notification identical to a retry - int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1; + int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0); long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000); + int timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT, + JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT); if (uri.contains("$jobId")) { uri = uri.replace("$jobId", status.getJobID().toString()); } @@ -56,17 +57,22 @@ private static JobEndStatusInfo createNotification(JobConf conf, (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED"; uri = uri.replace("$jobStatus", statusStr); } - notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval); + notification = new JobEndStatusInfo( + uri, retryAttempts, retryInterval, timeout); } return notification; } - private static int httpNotification(String uri) throws IOException { + private static int httpNotification(String uri, int timeout) + throws IOException { URI url = new URI(uri, false); - HttpClient m_client = new HttpClient(); + HttpClient httpClient = new HttpClient(); + httpClient.getParams().setSoTimeout(timeout); + httpClient.getParams().setConnectionManagerTimeout(timeout); + HttpMethod method = new GetMethod(url.getEscapedURI()); method.setRequestHeader("Accept", "*/*"); - return m_client.executeMethod(method); + return httpClient.executeMethod(method); } // for use by the LocalJobRunner, without using a thread&queue, @@ -74,9 +80,10 @@ private static int httpNotification(String uri) throws IOException { public static void localRunnerNotification(JobConf conf, JobStatus status) { JobEndStatusInfo notification = createNotification(conf, status); if (notification != null) { - while (notification.configureForRetry()) { + do { try { - int code = httpNotification(notification.getUri()); + int code = httpNotification(notification.getUri(), + notification.getTimeout()); if (code != 200) { throw new IOException("Invalid response status code: " + code); } @@ -96,7 +103,7 @@ public static void localRunnerNotification(JobConf conf, JobStatus status) { catch (InterruptedException iex) { LOG.error("Notification retry error [" + notification + "]", iex); } - } + } while (notification.configureForRetry()); } } @@ -105,12 +112,15 @@ private static class JobEndStatusInfo implements Delayed { private int retryAttempts; private long retryInterval; private long delayTime; + private int timeout; - JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) { + JobEndStatusInfo(String uri, int retryAttempts, long retryInterval, + int timeout) { this.uri = uri; this.retryAttempts = retryAttempts; this.retryInterval = retryInterval; this.delayTime = System.currentTimeMillis(); + this.timeout = timeout; } public String getUri() { @@ -125,6 +135,10 @@ public long getRetryInterval() { return retryInterval; } + public int getTimeout() { + return timeout; + } + public boolean configureForRetry() { boolean retry = false; if (getRetryAttempts() > 0) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 6e399ee7410..ba20ee5bb5e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -616,6 +616,9 @@ public interface MRJobConfig { public static final String MR_JOB_END_NOTIFICATION_PROXY = "mapreduce.job.end-notification.proxy"; + public static final String MR_JOB_END_NOTIFICATION_TIMEOUT = + "mapreduce.job.end-notification.timeout"; + public static final String MR_JOB_END_RETRY_ATTEMPTS = "mapreduce.job.end-notification.retry.attempts"; @@ -628,6 +631,9 @@ public interface MRJobConfig { public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL = "mapreduce.job.end-notification.max.retry.interval"; + public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT = + 5000; + /* * MR AM Service Authorization */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java new file mode 100644 index 00000000000..84905dadf5e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobEndNotifier.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer; + +public class TestJobEndNotifier extends TestCase { + HttpServer server; + URL baseUrl; + + @SuppressWarnings("serial") + public static class JobEndServlet extends HttpServlet { + public static volatile int calledTimes = 0; + public static URI requestUri; + + @Override + public void doGet(HttpServletRequest request, + HttpServletResponse response + ) throws ServletException, IOException { + InputStreamReader in = new InputStreamReader(request.getInputStream()); + PrintStream out = new PrintStream(response.getOutputStream()); + + calledTimes++; + try { + requestUri = new URI(null, null, + request.getRequestURI(), request.getQueryString(), null); + } catch (URISyntaxException e) { + } + + in.close(); + out.close(); + } + } + + // Servlet that delays requests for a long time + @SuppressWarnings("serial") + public static class DelayServlet extends HttpServlet { + public static volatile int calledTimes = 0; + + @Override + public void doGet(HttpServletRequest request, + HttpServletResponse response + ) throws ServletException, IOException { + boolean timedOut = false; + calledTimes++; + try { + // Sleep for a long time + Thread.sleep(1000000); + } catch (InterruptedException e) { + timedOut = true; + } + assertTrue("DelayServlet should be interrupted", timedOut); + } + } + + // Servlet that fails all requests into it + @SuppressWarnings("serial") + public static class FailServlet extends HttpServlet { + public static volatile int calledTimes = 0; + + @Override + public void doGet(HttpServletRequest request, + HttpServletResponse response + ) throws ServletException, IOException { + calledTimes++; + throw new IOException("I am failing!"); + } + } + + public void setUp() throws Exception { + new File(System.getProperty("build.webapps", "build/webapps") + "/test" + ).mkdirs(); + server = new HttpServer("test", "0.0.0.0", 0, true); + server.addServlet("delay", "/delay", DelayServlet.class); + server.addServlet("jobend", "/jobend", JobEndServlet.class); + server.addServlet("fail", "/fail", FailServlet.class); + server.start(); + int port = server.getPort(); + baseUrl = new URL("http://localhost:" + port + "/"); + + JobEndServlet.calledTimes = 0; + JobEndServlet.requestUri = null; + DelayServlet.calledTimes = 0; + FailServlet.calledTimes = 0; + } + + public void tearDown() throws Exception { + server.stop(); + } + + /** + * Basic validation for localRunnerNotification. + */ + public void testLocalJobRunnerUriSubstitution() throws InterruptedException { + JobStatus jobStatus = createTestJobStatus( + "job_20130313155005308_0001", JobStatus.SUCCEEDED); + JobConf jobConf = createTestJobConf( + new Configuration(), 0, + baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobEndNotifier.localRunnerNotification(jobConf, jobStatus); + + // No need to wait for the notification to go thru since calls are + // synchronous + + // Validate params + assertEquals(1, JobEndServlet.calledTimes); + assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED", + JobEndServlet.requestUri.getQuery()); + } + + /** + * Validate job.end.retry.attempts for the localJobRunner. + */ + public void testLocalJobRunnerRetryCount() throws InterruptedException { + int retryAttempts = 3; + JobStatus jobStatus = createTestJobStatus( + "job_20130313155005308_0001", JobStatus.SUCCEEDED); + JobConf jobConf = createTestJobConf( + new Configuration(), retryAttempts, baseUrl + "fail"); + JobEndNotifier.localRunnerNotification(jobConf, jobStatus); + + // Validate params + assertEquals(retryAttempts + 1, FailServlet.calledTimes); + } + + /** + * Validate that the notification times out after reaching + * mapreduce.job.end-notification.timeout. + */ + public void testNotificationTimeout() throws InterruptedException { + Configuration conf = new Configuration(); + // Reduce the timeout to 1 second + conf.setInt("mapreduce.job.end-notification.timeout", 1000); + + JobStatus jobStatus = createTestJobStatus( + "job_20130313155005308_0001", JobStatus.SUCCEEDED); + JobConf jobConf = createTestJobConf( + conf, 0, + baseUrl + "delay"); + long startTime = System.currentTimeMillis(); + JobEndNotifier.localRunnerNotification(jobConf, jobStatus); + long elapsedTime = System.currentTimeMillis() - startTime; + + // Validate params + assertEquals(1, DelayServlet.calledTimes); + // Make sure we timed out with time slightly above 1 second + // (default timeout is in terms of minutes, so we'll catch the problem) + assertTrue(elapsedTime < 2000); + } + + private static JobStatus createTestJobStatus(String jobId, int state) { + return new JobStatus( + JobID.forName(jobId), 0.5f, 0.0f, + state, "root", "TestJobEndNotifier", null, null); + } + + private static JobConf createTestJobConf( + Configuration conf, int retryAttempts, String notificationUri) { + JobConf jobConf = new JobConf(conf); + jobConf.setInt("job.end.retry.attempts", retryAttempts); + jobConf.set("job.end.retry.interval", "0"); + jobConf.setJobEndNotificationURI(notificationUri); + return jobConf; + } +}