MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann
This commit is contained in:
parent
da87653b66
commit
6062978768
|
@ -25,9 +25,11 @@ import java.net.MalformedURLException;
|
||||||
import java.net.Proxy;
|
import java.net.Proxy;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapred.JobContext;
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable {
|
||||||
protected int timeout; // Timeout (ms) on the connection and notification
|
protected int timeout; // Timeout (ms) on the connection and notification
|
||||||
protected URL urlToNotify; //URL to notify read from the config
|
protected URL urlToNotify; //URL to notify read from the config
|
||||||
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
|
||||||
|
// A custom notifier implementation
|
||||||
|
// (see org.apache.hadoop.mapreduce.CustomJobEndNotifier)
|
||||||
|
private String customJobEndNotifierClassName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the URL that needs to be notified of the end of the job, along
|
* Parse the URL that needs to be notified of the end of the job, along
|
||||||
|
@ -84,6 +89,9 @@ public class JobEndNotifier implements Configurable {
|
||||||
|
|
||||||
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
|
||||||
|
|
||||||
|
customJobEndNotifierClassName = StringUtils.stripToNull(
|
||||||
|
conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS));
|
||||||
|
|
||||||
//Configure the proxy to use if its set. It should be set like
|
//Configure the proxy to use if its set. It should be set like
|
||||||
//proxyType@proxyHostname:port
|
//proxyType@proxyHostname:port
|
||||||
if(proxyConf != null && !proxyConf.equals("") &&
|
if(proxyConf != null && !proxyConf.equals("") &&
|
||||||
|
@ -120,6 +128,17 @@ public class JobEndNotifier implements Configurable {
|
||||||
* Notify the URL just once. Use best effort.
|
* Notify the URL just once. Use best effort.
|
||||||
*/
|
*/
|
||||||
protected boolean notifyURLOnce() {
|
protected boolean notifyURLOnce() {
|
||||||
|
if (customJobEndNotifierClassName == null) {
|
||||||
|
return notifyViaBuiltInNotifier();
|
||||||
|
} else {
|
||||||
|
return notifyViaCustomNotifier();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses a simple HttpURLConnection to do the Job end notification.
|
||||||
|
*/
|
||||||
|
private boolean notifyViaBuiltInNotifier() {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
Log.getLog().info("Job end notification trying " + urlToNotify);
|
Log.getLog().info("Job end notification trying " + urlToNotify);
|
||||||
|
@ -145,6 +164,36 @@ public class JobEndNotifier implements Configurable {
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses the custom Job end notifier class to do the Job end notification.
|
||||||
|
*/
|
||||||
|
private boolean notifyViaCustomNotifier() {
|
||||||
|
try {
|
||||||
|
Log.getLog().info("Will be using " + customJobEndNotifierClassName
|
||||||
|
+ " for Job end notification");
|
||||||
|
|
||||||
|
final Class<? extends CustomJobEndNotifier> customJobEndNotifierClass =
|
||||||
|
Class.forName(customJobEndNotifierClassName)
|
||||||
|
.asSubclass(CustomJobEndNotifier.class);
|
||||||
|
final CustomJobEndNotifier customJobEndNotifier =
|
||||||
|
customJobEndNotifierClass.getDeclaredConstructor().newInstance();
|
||||||
|
|
||||||
|
boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf);
|
||||||
|
if (success) {
|
||||||
|
Log.getLog().info("Job end notification to " + urlToNotify
|
||||||
|
+ " succeeded");
|
||||||
|
} else {
|
||||||
|
Log.getLog().warn("Job end notification to " + urlToNotify
|
||||||
|
+ " failed");
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.getLog().warn("Job end notification to " + urlToNotify
|
||||||
|
+ " failed", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify a server of the completion of a submitted job. The user must have
|
* Notify a server of the completion of a submitted job. The user must have
|
||||||
* configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
|
* configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.io.PrintStream;
|
||||||
import java.net.Proxy;
|
import java.net.Proxy;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
|
@ -42,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.JobContext;
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||||
|
@ -299,6 +302,45 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomNotifierClass() throws InterruptedException {
|
||||||
|
JobConf conf = new JobConf();
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL,
|
||||||
|
"http://example.com?jobId=$jobId&jobStatus=$jobStatus");
|
||||||
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
|
||||||
|
CustomNotifier.class.getName());
|
||||||
|
this.setConf(conf);
|
||||||
|
|
||||||
|
JobReport jobReport = mock(JobReport.class);
|
||||||
|
JobId jobId = mock(JobId.class);
|
||||||
|
when(jobId.toString()).thenReturn("mock-Id");
|
||||||
|
when(jobReport.getJobId()).thenReturn(jobId);
|
||||||
|
when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
CustomNotifier.urlToNotify = null;
|
||||||
|
this.notify(jobReport);
|
||||||
|
final URL urlToNotify = CustomNotifier.urlToNotify;
|
||||||
|
|
||||||
|
Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED",
|
||||||
|
urlToNotify.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class CustomNotifier implements CustomJobEndNotifier {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Once notifyOnce was invoked we'll store the URL in this variable
|
||||||
|
* so we can assert on it.
|
||||||
|
*/
|
||||||
|
private static URL urlToNotify = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean notifyOnce(final URL url, final Configuration jobConf) {
|
||||||
|
urlToNotify = url;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static HttpServer2 startHttpServer() throws Exception {
|
private static HttpServer2 startHttpServer() throws Exception {
|
||||||
new File(System.getProperty(
|
new File(System.getProperty(
|
||||||
"build.webapps", "build/webapps") + "/test").mkdirs();
|
"build.webapps", "build/webapps") + "/test").mkdirs();
|
||||||
|
|
|
@ -1886,6 +1886,52 @@ public class JobConf extends Configuration {
|
||||||
set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
|
set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the class to be invoked in order to send a notification
|
||||||
|
* after the job has completed (success/failure).
|
||||||
|
*
|
||||||
|
* @return the fully-qualified name of the class which implements
|
||||||
|
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the
|
||||||
|
* {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS}
|
||||||
|
* property
|
||||||
|
*
|
||||||
|
* @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String)
|
||||||
|
* @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
|
||||||
|
*/
|
||||||
|
public String getJobEndNotificationCustomNotifierClass() {
|
||||||
|
return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the class to be invoked in order to send a notification after the job
|
||||||
|
* has completed (success/failure).
|
||||||
|
*
|
||||||
|
* A notification url still has to be set which will be passed to
|
||||||
|
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce(
|
||||||
|
* java.net.URL, org.apache.hadoop.conf.Configuration)}
|
||||||
|
* along with the Job's conf.
|
||||||
|
*
|
||||||
|
* If this is set instead of using a simple HttpURLConnection
|
||||||
|
* we'll create a new instance of this class
|
||||||
|
* which should be an implementation of
|
||||||
|
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier},
|
||||||
|
* and we'll invoke that.
|
||||||
|
*
|
||||||
|
* @param customNotifierClassName the fully-qualified name of the class
|
||||||
|
* which implements
|
||||||
|
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}
|
||||||
|
*
|
||||||
|
* @see JobConf#setJobEndNotificationURI(java.lang.String)
|
||||||
|
* @see
|
||||||
|
* org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
|
||||||
|
*/
|
||||||
|
public void setJobEndNotificationCustomNotifierClass(
|
||||||
|
String customNotifierClassName) {
|
||||||
|
|
||||||
|
set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
|
||||||
|
customNotifierClassName);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get job-specific shared directory for use as scratch space
|
* Get job-specific shared directory for use as scratch space
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An interface for implementing a custom Job end notifier. The built-in
|
||||||
|
* Job end notifier uses a simple HTTP connection to notify the Job end status.
|
||||||
|
* By implementing this interface and setting the
|
||||||
|
* {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property
|
||||||
|
* in the map-reduce Job configuration you can have your own
|
||||||
|
* notification mechanism. For now this still only works with HTTP/HTTPS URLs,
|
||||||
|
* but by implementing this class you can choose how you want to make the
|
||||||
|
* notification itself. For example you can choose to use a custom
|
||||||
|
* HTTP library, or do a delegation token authentication, maybe set a
|
||||||
|
* custom SSL context on the connection, etc. This means you still have to set
|
||||||
|
* the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property
|
||||||
|
* in the Job's conf.
|
||||||
|
*/
|
||||||
|
public interface CustomJobEndNotifier {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The implementation should try to do a Job end notification only once.
|
||||||
|
*
|
||||||
|
* See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS},
|
||||||
|
* {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS}
|
||||||
|
* and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly
|
||||||
|
* this method will be invoked.
|
||||||
|
*
|
||||||
|
* @param url the URL which needs to be notified
|
||||||
|
* (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL})
|
||||||
|
* @param jobConf the map-reduce Job's configuration
|
||||||
|
*
|
||||||
|
* @return true if the notification was successful
|
||||||
|
*/
|
||||||
|
boolean notifyOnce(URL url, Configuration jobConf) throws Exception;
|
||||||
|
|
||||||
|
}
|
|
@ -1074,6 +1074,9 @@ public interface MRJobConfig {
|
||||||
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
|
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
|
||||||
"mapreduce.job.end-notification.max.retry.interval";
|
"mapreduce.job.end-notification.max.retry.interval";
|
||||||
|
|
||||||
|
String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS =
|
||||||
|
"mapreduce.job.end-notification.custom-notifier-class";
|
||||||
|
|
||||||
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
|
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
|
||||||
5000;
|
5000;
|
||||||
|
|
||||||
|
|
|
@ -1373,6 +1373,23 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.end-notification.custom-notifier-class</name>
|
||||||
|
<description>A class to be invoked in order to send a notification after the
|
||||||
|
job has completed (success/failure). The class must implement
|
||||||
|
org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification
|
||||||
|
url still has to be set which will be passed to the notifyOnce
|
||||||
|
method of your implementation along with the Job's configuration.
|
||||||
|
If this is set instead of using a simple HttpURLConnection we'll
|
||||||
|
create a new instance of this class. For now this still only works
|
||||||
|
with HTTP/HTTPS URLs, but by implementing this class you can choose
|
||||||
|
how you want to make the notification itself. For example you can
|
||||||
|
choose to use a custom HTTP library, or do a delegation token
|
||||||
|
authentication, maybe set a custom SSL context on the connection, etc.
|
||||||
|
The class needs to have a no-arg constructor.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.log4j-properties-file</name>
|
<name>mapreduce.job.log4j-properties-file</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
|
Loading…
Reference in New Issue