MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after the job is really done - a bug caused by MAPREDUCE-5505. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1527219 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1527221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
83ef3a1ab5
commit
3c3b52b80b
|
@ -108,6 +108,10 @@ Release 2.1.2 - UNRELEASED
|
||||||
by re-introducing (get,set)PartitionFile which takes in JobConf. (Robert
|
by re-introducing (get,set)PartitionFile which takes in JobConf. (Robert
|
||||||
Kanter via acmurthy)
|
Kanter via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after
|
||||||
|
the job is really done - a bug caused by MAPREDUCE-5505. (Zhijie Shen via
|
||||||
|
vinodkv)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -531,19 +531,6 @@ public class MRAppMaster extends CompositeService {
|
||||||
// this is the only job, so shut down the Appmaster
|
// this is the only job, so shut down the Appmaster
|
||||||
// note in a workflow scenario, this may lead to creation of a new
|
// note in a workflow scenario, this may lead to creation of a new
|
||||||
// job (FIXME?)
|
// job (FIXME?)
|
||||||
// Send job-end notification
|
|
||||||
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
|
|
||||||
try {
|
|
||||||
LOG.info("Job end notification started for jobID : "
|
|
||||||
+ job.getReport().getJobId());
|
|
||||||
JobEndNotifier notifier = new JobEndNotifier();
|
|
||||||
notifier.setConf(getConfig());
|
|
||||||
notifier.notify(job.getReport());
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
LOG.warn("Job end notification interrupted for jobID : "
|
|
||||||
+ job.getReport().getJobId(), ie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
//if isLastAMRetry comes as true, should never set it to false
|
//if isLastAMRetry comes as true, should never set it to false
|
||||||
|
@ -559,10 +546,28 @@ public class MRAppMaster extends CompositeService {
|
||||||
LOG.info("Calling stop for all the services");
|
LOG.info("Calling stop for all the services");
|
||||||
MRAppMaster.this.stop();
|
MRAppMaster.this.stop();
|
||||||
|
|
||||||
// Except ClientService, other services are already stopped, it is safe to
|
if (isLastAMRetry) {
|
||||||
// let clients know the final states. ClientService should wait for some
|
// Except ClientService, other services are already stopped, it is safe to
|
||||||
// time so clients have enough time to know the final states.
|
// let clients know the final states. ClientService should wait for some
|
||||||
safeToReportTerminationToUser.set(true);
|
// time so clients have enough time to know the final states.
|
||||||
|
safeToReportTerminationToUser.set(true);
|
||||||
|
|
||||||
|
// Send job-end notification when it is safe to report termination to
|
||||||
|
// users and it is the last AM retry
|
||||||
|
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
|
||||||
|
try {
|
||||||
|
LOG.info("Job end notification started for jobID : "
|
||||||
|
+ job.getReport().getJobId());
|
||||||
|
JobEndNotifier notifier = new JobEndNotifier();
|
||||||
|
notifier.setConf(getConfig());
|
||||||
|
notifier.notify(job.getReport());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Job end notification interrupted for jobID : "
|
||||||
|
+ job.getReport().getJobId(), ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -128,6 +128,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.Clock;
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/** Implementation of Job interface. Maintains the state machines of Job.
|
/** Implementation of Job interface. Maintains the state machines of Job.
|
||||||
* The read and write calls use ReadWriteLock for concurrency.
|
* The read and write calls use ReadWriteLock for concurrency.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,19 +18,41 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.PrintStream;
|
||||||
import java.net.Proxy;
|
import java.net.Proxy;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.HttpServer;
|
||||||
|
import org.apache.hadoop.mapred.JobContext;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests job end notification
|
* Tests job end notification
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public class TestJobEndNotifier extends JobEndNotifier {
|
public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
|
|
||||||
//Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
|
//Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
|
||||||
|
@ -133,7 +155,7 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
public void testNotifyRetries() throws InterruptedException {
|
public void testNotifyRetries() throws InterruptedException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
|
||||||
JobReport jobReport = Mockito.mock(JobReport.class);
|
JobReport jobReport = mock(JobReport.class);
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
this.notificationCount = 0;
|
this.notificationCount = 0;
|
||||||
|
@ -162,4 +184,100 @@ public class TestJobEndNotifier extends JobEndNotifier {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotificationOnNormalShutdown() throws Exception {
|
||||||
|
HttpServer server = startHttpServer();
|
||||||
|
// Act like it is the second attempt. Default max attempts is 2
|
||||||
|
MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2));
|
||||||
|
// Make use of safeToReportflag so that we can look at final job-state as
|
||||||
|
// seen by real users.
|
||||||
|
app.safeToReportTerminationToUser.set(false);
|
||||||
|
doNothing().when(app).sysexit();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||||
|
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||||
|
JobImpl job = (JobImpl)app.submit(conf);
|
||||||
|
// Even though auto-complete is true, because app is not shut-down yet, user
|
||||||
|
// will only see RUNNING state.
|
||||||
|
app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
// Now shutdown. User should see SUCCEEDED state.
|
||||||
|
app.shutDownJob();
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
Assert.assertEquals(true, app.isLastAMRetry());
|
||||||
|
Assert.assertEquals(1, JobEndServlet.calledTimes);
|
||||||
|
Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED",
|
||||||
|
JobEndServlet.requestUri.getQuery());
|
||||||
|
Assert.assertEquals(JobState.SUCCEEDED.toString(),
|
||||||
|
JobEndServlet.foundJobState);
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotificationOnNonLastRetryShutdown() throws Exception {
|
||||||
|
HttpServer server = startHttpServer();
|
||||||
|
MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true));
|
||||||
|
doNothing().when(app).sysexit();
|
||||||
|
// Make use of safeToReportflag so that we can look at final job-state as
|
||||||
|
// seen by real users.
|
||||||
|
app.safeToReportTerminationToUser.set(false);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
|
||||||
|
JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
|
||||||
|
JobImpl job = (JobImpl)app.submit(new Configuration());
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
app.getContext().getEventHandler()
|
||||||
|
.handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
|
||||||
|
app.waitForInternalState(job, JobStateInternal.REBOOT);
|
||||||
|
// Not the last AM attempt. So user should that the job is still running.
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
app.shutDownJob();
|
||||||
|
Assert.assertEquals(false, app.isLastAMRetry());
|
||||||
|
Assert.assertEquals(0, JobEndServlet.calledTimes);
|
||||||
|
Assert.assertEquals(null, JobEndServlet.requestUri);
|
||||||
|
Assert.assertEquals(null, JobEndServlet.foundJobState);
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HttpServer startHttpServer() throws Exception {
|
||||||
|
new File(System.getProperty(
|
||||||
|
"build.webapps", "build/webapps") + "/test").mkdirs();
|
||||||
|
HttpServer server = new HttpServer.Builder().setName("test")
|
||||||
|
.setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build();
|
||||||
|
server.addServlet("jobend", "/jobend", JobEndServlet.class);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
JobEndServlet.calledTimes = 0;
|
||||||
|
JobEndServlet.requestUri = null;
|
||||||
|
JobEndServlet.baseUrl = "http://localhost:" + server.getPort() + "/";
|
||||||
|
JobEndServlet.foundJobState = null;
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public static class JobEndServlet extends HttpServlet {
|
||||||
|
public static volatile int calledTimes = 0;
|
||||||
|
public static URI requestUri;
|
||||||
|
public static String baseUrl;
|
||||||
|
public static String foundJobState;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doGet(HttpServletRequest request, HttpServletResponse response)
|
||||||
|
throws ServletException, IOException {
|
||||||
|
InputStreamReader in = new InputStreamReader(request.getInputStream());
|
||||||
|
PrintStream out = new PrintStream(response.getOutputStream());
|
||||||
|
|
||||||
|
calledTimes++;
|
||||||
|
try {
|
||||||
|
requestUri = new URI(null, null,
|
||||||
|
request.getRequestURI(), request.getQueryString(), null);
|
||||||
|
foundJobState = request.getParameter("status");
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
in.close();
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue