Merge -c 1470216 from trunk to branch-2 to fix MAPREDUCE-5066. Added a timeout for the job.end.notification.url. Contributed by Ivan Mitic.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1470217 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e2b5e3c2b4
commit
b65a090174
|
@ -168,6 +168,9 @@ Release 2.0.5-beta - UNRELEASED
|
|||
MAPREDUCE-5163. Update MR App to not use API utility methods for collections
|
||||
after YARN-441. (Xuan Gong via vinodkv)
|
||||
|
||||
MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
|
||||
Mitic via acmurthy)
|
||||
|
||||
Release 2.0.4-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.net.URL;
|
|||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapred.JobContext;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||
import org.mortbay.log.Log;
|
||||
|
@ -54,6 +55,7 @@ public class JobEndNotifier implements Configurable {
|
|||
protected String proxyConf;
|
||||
protected int numTries; //Number of tries to attempt notification
|
||||
protected int waitInterval; //Time (ms) to wait between retrying notification
|
||||
protected int timeout; // Timeout (ms) on the connection and notification
|
||||
protected URL urlToNotify; //URL to notify read from the config
|
||||
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
||||
|
||||
|
@ -76,6 +78,9 @@ public class JobEndNotifier implements Configurable {
|
|||
);
|
||||
waitInterval = (waitInterval < 0) ? 5000 : waitInterval;
|
||||
|
||||
timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
|
||||
JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
|
||||
|
||||
userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
|
||||
|
||||
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
||||
|
@ -112,8 +117,7 @@ public class JobEndNotifier implements Configurable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Notify the URL just once. Use best effort. Timeout hard coded to 5
|
||||
* seconds.
|
||||
* Notify the URL just once. Use best effort.
|
||||
*/
|
||||
protected boolean notifyURLOnce() {
|
||||
boolean success = false;
|
||||
|
@ -121,8 +125,8 @@ public class JobEndNotifier implements Configurable {
|
|||
Log.info("Job end notification trying " + urlToNotify);
|
||||
HttpURLConnection conn =
|
||||
(HttpURLConnection) urlToNotify.openConnection(proxyToUse);
|
||||
conn.setConnectTimeout(5*1000);
|
||||
conn.setReadTimeout(5*1000);
|
||||
conn.setConnectTimeout(timeout);
|
||||
conn.setReadTimeout(timeout);
|
||||
conn.setAllowUserInteraction(false);
|
||||
if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
||||
Log.warn("Job end notification to " + urlToNotify +" failed with code: "
|
||||
|
|
|
@ -73,6 +73,13 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
+ waitInterval, waitInterval == 5000);
|
||||
}
|
||||
|
||||
private void testTimeout(Configuration conf) {
|
||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_TIMEOUT, "1000");
|
||||
setConf(conf);
|
||||
Assert.assertTrue("Expected timeout to be 1000, but was "
|
||||
+ timeout, timeout == 1000);
|
||||
}
|
||||
|
||||
private void testProxyConfiguration(Configuration conf) {
|
||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
|
||||
setConf(conf);
|
||||
|
@ -109,6 +116,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
|||
Configuration conf = new Configuration();
|
||||
testNumRetries(conf);
|
||||
testWaitInterval(conf);
|
||||
testTimeout(conf);
|
||||
testProxyConfiguration(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,9 +44,10 @@ public class JobEndNotifier {
|
|||
JobEndStatusInfo notification = null;
|
||||
String uri = conf.getJobEndNotificationURI();
|
||||
if (uri != null) {
|
||||
// +1 to make logic for first notification identical to a retry
|
||||
int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1;
|
||||
int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0);
|
||||
long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
|
||||
int timeout = conf.getInt(JobContext.MR_JOB_END_NOTIFICATION_TIMEOUT,
|
||||
JobContext.DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT);
|
||||
if (uri.contains("$jobId")) {
|
||||
uri = uri.replace("$jobId", status.getJobID().toString());
|
||||
}
|
||||
|
@ -56,17 +57,22 @@ public class JobEndNotifier {
|
|||
(status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
|
||||
uri = uri.replace("$jobStatus", statusStr);
|
||||
}
|
||||
notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);
|
||||
notification = new JobEndStatusInfo(
|
||||
uri, retryAttempts, retryInterval, timeout);
|
||||
}
|
||||
return notification;
|
||||
}
|
||||
|
||||
private static int httpNotification(String uri) throws IOException {
|
||||
private static int httpNotification(String uri, int timeout)
|
||||
throws IOException {
|
||||
URI url = new URI(uri, false);
|
||||
HttpClient m_client = new HttpClient();
|
||||
HttpClient httpClient = new HttpClient();
|
||||
httpClient.getParams().setSoTimeout(timeout);
|
||||
httpClient.getParams().setConnectionManagerTimeout(timeout);
|
||||
|
||||
HttpMethod method = new GetMethod(url.getEscapedURI());
|
||||
method.setRequestHeader("Accept", "*/*");
|
||||
return m_client.executeMethod(method);
|
||||
return httpClient.executeMethod(method);
|
||||
}
|
||||
|
||||
// for use by the LocalJobRunner, without using a thread&queue,
|
||||
|
@ -74,9 +80,10 @@ public class JobEndNotifier {
|
|||
public static void localRunnerNotification(JobConf conf, JobStatus status) {
|
||||
JobEndStatusInfo notification = createNotification(conf, status);
|
||||
if (notification != null) {
|
||||
while (notification.configureForRetry()) {
|
||||
do {
|
||||
try {
|
||||
int code = httpNotification(notification.getUri());
|
||||
int code = httpNotification(notification.getUri(),
|
||||
notification.getTimeout());
|
||||
if (code != 200) {
|
||||
throw new IOException("Invalid response status code: " + code);
|
||||
}
|
||||
|
@ -96,7 +103,7 @@ public class JobEndNotifier {
|
|||
catch (InterruptedException iex) {
|
||||
LOG.error("Notification retry error [" + notification + "]", iex);
|
||||
}
|
||||
}
|
||||
} while (notification.configureForRetry());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,12 +112,15 @@ public class JobEndNotifier {
|
|||
private int retryAttempts;
|
||||
private long retryInterval;
|
||||
private long delayTime;
|
||||
private int timeout;
|
||||
|
||||
JobEndStatusInfo(String uri, int retryAttempts, long retryInterval) {
|
||||
JobEndStatusInfo(String uri, int retryAttempts, long retryInterval,
|
||||
int timeout) {
|
||||
this.uri = uri;
|
||||
this.retryAttempts = retryAttempts;
|
||||
this.retryInterval = retryInterval;
|
||||
this.delayTime = System.currentTimeMillis();
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public String getUri() {
|
||||
|
@ -125,6 +135,10 @@ public class JobEndNotifier {
|
|||
return retryInterval;
|
||||
}
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public boolean configureForRetry() {
|
||||
boolean retry = false;
|
||||
if (getRetryAttempts() > 0) {
|
||||
|
|
|
@ -616,6 +616,9 @@ public interface MRJobConfig {
|
|||
public static final String MR_JOB_END_NOTIFICATION_PROXY =
|
||||
"mapreduce.job.end-notification.proxy";
|
||||
|
||||
public static final String MR_JOB_END_NOTIFICATION_TIMEOUT =
|
||||
"mapreduce.job.end-notification.timeout";
|
||||
|
||||
public static final String MR_JOB_END_RETRY_ATTEMPTS =
|
||||
"mapreduce.job.end-notification.retry.attempts";
|
||||
|
||||
|
@ -628,6 +631,9 @@ public interface MRJobConfig {
|
|||
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
|
||||
"mapreduce.job.end-notification.max.retry.interval";
|
||||
|
||||
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
|
||||
5000;
|
||||
|
||||
/*
|
||||
* MR AM Service Authorization
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.HttpServer;
|
||||
|
||||
public class TestJobEndNotifier extends TestCase {
|
||||
HttpServer server;
|
||||
URL baseUrl;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public static class JobEndServlet extends HttpServlet {
|
||||
public static volatile int calledTimes = 0;
|
||||
public static URI requestUri;
|
||||
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request,
|
||||
HttpServletResponse response
|
||||
) throws ServletException, IOException {
|
||||
InputStreamReader in = new InputStreamReader(request.getInputStream());
|
||||
PrintStream out = new PrintStream(response.getOutputStream());
|
||||
|
||||
calledTimes++;
|
||||
try {
|
||||
requestUri = new URI(null, null,
|
||||
request.getRequestURI(), request.getQueryString(), null);
|
||||
} catch (URISyntaxException e) {
|
||||
}
|
||||
|
||||
in.close();
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
// Servlet that delays requests for a long time
|
||||
@SuppressWarnings("serial")
|
||||
public static class DelayServlet extends HttpServlet {
|
||||
public static volatile int calledTimes = 0;
|
||||
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request,
|
||||
HttpServletResponse response
|
||||
) throws ServletException, IOException {
|
||||
boolean timedOut = false;
|
||||
calledTimes++;
|
||||
try {
|
||||
// Sleep for a long time
|
||||
Thread.sleep(1000000);
|
||||
} catch (InterruptedException e) {
|
||||
timedOut = true;
|
||||
}
|
||||
assertTrue("DelayServlet should be interrupted", timedOut);
|
||||
}
|
||||
}
|
||||
|
||||
// Servlet that fails all requests into it
|
||||
@SuppressWarnings("serial")
|
||||
public static class FailServlet extends HttpServlet {
|
||||
public static volatile int calledTimes = 0;
|
||||
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request,
|
||||
HttpServletResponse response
|
||||
) throws ServletException, IOException {
|
||||
calledTimes++;
|
||||
throw new IOException("I am failing!");
|
||||
}
|
||||
}
|
||||
|
||||
public void setUp() throws Exception {
|
||||
new File(System.getProperty("build.webapps", "build/webapps") + "/test"
|
||||
).mkdirs();
|
||||
server = new HttpServer("test", "0.0.0.0", 0, true);
|
||||
server.addServlet("delay", "/delay", DelayServlet.class);
|
||||
server.addServlet("jobend", "/jobend", JobEndServlet.class);
|
||||
server.addServlet("fail", "/fail", FailServlet.class);
|
||||
server.start();
|
||||
int port = server.getPort();
|
||||
baseUrl = new URL("http://localhost:" + port + "/");
|
||||
|
||||
JobEndServlet.calledTimes = 0;
|
||||
JobEndServlet.requestUri = null;
|
||||
DelayServlet.calledTimes = 0;
|
||||
FailServlet.calledTimes = 0;
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic validation for localRunnerNotification.
|
||||
*/
|
||||
public void testLocalJobRunnerUriSubstitution() throws InterruptedException {
|
||||
JobStatus jobStatus = createTestJobStatus(
|
||||
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||
JobConf jobConf = createTestJobConf(
|
||||
new Configuration(), 0,
|
||||
baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||
|
||||
// No need to wait for the notification to go thru since calls are
|
||||
// synchronous
|
||||
|
||||
// Validate params
|
||||
assertEquals(1, JobEndServlet.calledTimes);
|
||||
assertEquals("jobid=job_20130313155005308_0001&status=SUCCEEDED",
|
||||
JobEndServlet.requestUri.getQuery());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate job.end.retry.attempts for the localJobRunner.
|
||||
*/
|
||||
public void testLocalJobRunnerRetryCount() throws InterruptedException {
|
||||
int retryAttempts = 3;
|
||||
JobStatus jobStatus = createTestJobStatus(
|
||||
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||
JobConf jobConf = createTestJobConf(
|
||||
new Configuration(), retryAttempts, baseUrl + "fail");
|
||||
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||
|
||||
// Validate params
|
||||
assertEquals(retryAttempts + 1, FailServlet.calledTimes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that the notification times out after reaching
|
||||
* mapreduce.job.end-notification.timeout.
|
||||
*/
|
||||
public void testNotificationTimeout() throws InterruptedException {
|
||||
Configuration conf = new Configuration();
|
||||
// Reduce the timeout to 1 second
|
||||
conf.setInt("mapreduce.job.end-notification.timeout", 1000);
|
||||
|
||||
JobStatus jobStatus = createTestJobStatus(
|
||||
"job_20130313155005308_0001", JobStatus.SUCCEEDED);
|
||||
JobConf jobConf = createTestJobConf(
|
||||
conf, 0,
|
||||
baseUrl + "delay");
|
||||
long startTime = System.currentTimeMillis();
|
||||
JobEndNotifier.localRunnerNotification(jobConf, jobStatus);
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
|
||||
// Validate params
|
||||
assertEquals(1, DelayServlet.calledTimes);
|
||||
// Make sure we timed out with time slightly above 1 second
|
||||
// (default timeout is in terms of minutes, so we'll catch the problem)
|
||||
assertTrue(elapsedTime < 2000);
|
||||
}
|
||||
|
||||
private static JobStatus createTestJobStatus(String jobId, int state) {
|
||||
return new JobStatus(
|
||||
JobID.forName(jobId), 0.5f, 0.0f,
|
||||
state, "root", "TestJobEndNotifier", null, null);
|
||||
}
|
||||
|
||||
private static JobConf createTestJobConf(
|
||||
Configuration conf, int retryAttempts, String notificationUri) {
|
||||
JobConf jobConf = new JobConf(conf);
|
||||
jobConf.setInt("job.end.retry.attempts", retryAttempts);
|
||||
jobConf.set("job.end.retry.interval", "0");
|
||||
jobConf.setJobEndNotificationURI(notificationUri);
|
||||
return jobConf;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue