MAPREDUCE-7304. Enhance the map-reduce Job end notifier to be able to notify the given URL via a custom class. Contributed by Zoltan Erdmann

This commit is contained in:
Peter Bacsko 2020-11-20 13:13:51 +01:00
parent f3c629c27e
commit fb92aa4012
6 changed files with 215 additions and 1 deletions

View File

@ -25,9 +25,11 @@ import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.eclipse.jetty.util.log.Log;
@ -57,6 +59,9 @@ public class JobEndNotifier implements Configurable {
protected int timeout; // Timeout (ms) on the connection and notification
protected URL urlToNotify; //URL to notify read from the config
protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
// A custom notifier implementation
// (see org.apache.hadoop.mapreduce.CustomJobEndNotifier)
private String customJobEndNotifierClassName;
/**
* Parse the URL that needs to be notified of the end of the job, along
@ -84,6 +89,9 @@ public class JobEndNotifier implements Configurable {
proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
customJobEndNotifierClassName = StringUtils.stripToNull(
conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS));
//Configure the proxy to use if its set. It should be set like
//proxyType@proxyHostname:port
if(proxyConf != null && !proxyConf.equals("") &&
@ -115,11 +123,22 @@ public class JobEndNotifier implements Configurable {
public Configuration getConf() {
return conf;
}
/**
* Notify the URL just once. Use best effort.
*/
protected boolean notifyURLOnce() {
if (customJobEndNotifierClassName == null) {
return notifyViaBuiltInNotifier();
} else {
return notifyViaCustomNotifier();
}
}
/**
* Uses a simple HttpURLConnection to do the Job end notification.
*/
private boolean notifyViaBuiltInNotifier() {
boolean success = false;
try {
Log.getLog().info("Job end notification trying " + urlToNotify);
@ -145,6 +164,36 @@ public class JobEndNotifier implements Configurable {
return success;
}
/**
* Uses the custom Job end notifier class to do the Job end notification.
*/
private boolean notifyViaCustomNotifier() {
try {
Log.getLog().info("Will be using " + customJobEndNotifierClassName
+ " for Job end notification");
final Class<? extends CustomJobEndNotifier> customJobEndNotifierClass =
Class.forName(customJobEndNotifierClassName)
.asSubclass(CustomJobEndNotifier.class);
final CustomJobEndNotifier customJobEndNotifier =
customJobEndNotifierClass.getDeclaredConstructor().newInstance();
boolean success = customJobEndNotifier.notifyOnce(urlToNotify, conf);
if (success) {
Log.getLog().info("Job end notification to " + urlToNotify
+ " succeeded");
} else {
Log.getLog().warn("Job end notification to " + urlToNotify
+ " failed");
}
return success;
} catch (Exception e) {
Log.getLog().warn("Job end notification to " + urlToNotify
+ " failed", e);
return false;
}
}
/**
* Notify a server of the completion of a submitted job. The user must have
* configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL

View File

@ -31,6 +31,7 @@ import java.io.PrintStream;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
import javax.servlet.ServletException;
@ -42,7 +43,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.CustomJobEndNotifier;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -299,6 +302,45 @@ public class TestJobEndNotifier extends JobEndNotifier {
server.stop();
}
@Test
public void testCustomNotifierClass() throws InterruptedException {
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL,
"http://example.com?jobId=$jobId&jobStatus=$jobStatus");
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
CustomNotifier.class.getName());
this.setConf(conf);
JobReport jobReport = mock(JobReport.class);
JobId jobId = mock(JobId.class);
when(jobId.toString()).thenReturn("mock-Id");
when(jobReport.getJobId()).thenReturn(jobId);
when(jobReport.getJobState()).thenReturn(JobState.SUCCEEDED);
CustomNotifier.urlToNotify = null;
this.notify(jobReport);
final URL urlToNotify = CustomNotifier.urlToNotify;
Assert.assertEquals("http://example.com?jobId=mock-Id&jobStatus=SUCCEEDED",
urlToNotify.toString());
}
public static final class CustomNotifier implements CustomJobEndNotifier {
/**
* Once notifyOnce was invoked we'll store the URL in this variable
* so we can assert on it.
*/
private static URL urlToNotify = null;
@Override
public boolean notifyOnce(final URL url, final Configuration jobConf) {
urlToNotify = url;
return true;
}
}
private static HttpServer2 startHttpServer() throws Exception {
new File(System.getProperty(
"build.webapps", "build/webapps") + "/test").mkdirs();

View File

@ -1886,6 +1886,52 @@ public class JobConf extends Configuration {
set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
}
/**
* Returns the class to be invoked in order to send a notification
* after the job has completed (success/failure).
*
* @return the fully-qualified name of the class which implements
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier} set through the
* {@link org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS}
* property
*
* @see JobConf#setJobEndNotificationCustomNotifierClass(java.lang.String)
* @see org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
*/
public String getJobEndNotificationCustomNotifierClass() {
return get(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS);
}
/**
* Sets the class to be invoked in order to send a notification after the job
* has completed (success/failure).
*
* A notification url still has to be set which will be passed to
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier#notifyOnce(
* java.net.URL, org.apache.hadoop.conf.Configuration)}
* along with the Job's conf.
*
* If this is set instead of using a simple HttpURLConnection
* we'll create a new instance of this class
* which should be an implementation of
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier},
* and we'll invoke that.
*
* @param customNotifierClassName the fully-qualified name of the class
* which implements
* {@link org.apache.hadoop.mapreduce.CustomJobEndNotifier}
*
* @see JobConf#setJobEndNotificationURI(java.lang.String)
* @see
* org.apache.hadoop.mapreduce.MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS
*/
public void setJobEndNotificationCustomNotifierClass(
String customNotifierClassName) {
set(JobContext.MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS,
customNotifierClassName);
}
/**
* Get job-specific shared directory for use as scratch space
*

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import java.net.URL;
/**
* An interface for implementing a custom Job end notifier. The built-in
* Job end notifier uses a simple HTTP connection to notify the Job end status.
* By implementing this interface and setting the
* {@link MRJobConfig#MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS} property
* in the map-reduce Job configuration you can have your own
* notification mechanism. For now this still only works with HTTP/HTTPS URLs,
* but by implementing this class you can choose how you want to make the
* notification itself. For example you can choose to use a custom
* HTTP library, or do a delegation token authentication, maybe set a
* custom SSL context on the connection, etc. This means you still have to set
* the {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL} property
* in the Job's conf.
*/
public interface CustomJobEndNotifier {
/**
* The implementation should try to do a Job end notification only once.
*
* See {@link MRJobConfig#MR_JOB_END_RETRY_ATTEMPTS},
* {@link MRJobConfig#MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS}
* and org.apache.hadoop.mapreduce.v2.app.JobEndNotifier on how exactly
* this method will be invoked.
*
* @param url the URL which needs to be notified
* (see {@link MRJobConfig#MR_JOB_END_NOTIFICATION_URL})
* @param jobConf the map-reduce Job's configuration
*
* @return true if the notification was successful
*/
boolean notifyOnce(URL url, Configuration jobConf) throws Exception;
}

View File

@ -1105,6 +1105,9 @@ public interface MRJobConfig {
public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
"mapreduce.job.end-notification.max.retry.interval";
String MR_JOB_END_NOTIFICATION_CUSTOM_NOTIFIER_CLASS =
"mapreduce.job.end-notification.custom-notifier-class";
public static final int DEFAULT_MR_JOB_END_NOTIFICATION_TIMEOUT =
5000;

View File

@ -1415,6 +1415,23 @@
</description>
</property>
<property>
<name>mapreduce.job.end-notification.custom-notifier-class</name>
<description>A class to be invoked in order to send a notification after the
job has completed (success/failure). The class must implement
org.apache.hadoop.mapreduce.CustomJobEndNotifier. A notification
url still has to be set which will be passed to the notifyOnce
method of your implementation along with the Job's configuration.
If this is set instead of using a simple HttpURLConnection we'll
create a new instance of this class. For now this still only works
with HTTP/HTTPS URLs, but by implementing this class you can choose
how you want to make the notification itself. For example you can
choose to use a custom HTTP library, or do a delegation token
authentication, maybe set a custom SSL context on the connection, etc.
The class needs to have a no-arg constructor.
</description>
</property>
<property>
<name>mapreduce.job.log4j-properties-file</name>
<value></value>