From 3c3b52b80b7d97d5fcfaa031c589f3efbe9f60a2 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 28 Sep 2013 19:14:46 +0000 Subject: [PATCH] MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after the job is really done - a bug caused by MAPREDUCE-5505. Contributed by Zhijie Shen. svn merge --ignore-ancestry -c 1527219 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1527221 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 39 +++--- .../mapreduce/v2/app/job/impl/JobImpl.java | 2 + .../mapreduce/v2/app/TestJobEndNotifier.java | 122 +++++++++++++++++- 4 files changed, 148 insertions(+), 19 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 114d1feb244..609fdcc7398 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -108,6 +108,10 @@ Release 2.1.2 - UNRELEASED by re-introducing (get,set)PartitionFile which takes in JobConf. (Robert Kanter via acmurthy) + MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after + the job is really done - a bug caused by MAPREDUCE-5505. (Zhijie Shen via + vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 230719511d3..8d548ac30be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -531,19 +531,6 @@ public class MRAppMaster extends CompositeService { // this is the only job, so shut down the Appmaster // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) - // Send job-end notification - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - notifier.notify(job.getReport()); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); - } - } try { //if isLastAMRetry comes as true, should never set it to false @@ -559,10 +546,28 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); MRAppMaster.this.stop(); - // Except ClientService, other services are already stopped, it is safe to - // let clients know the final states. ClientService should wait for some - // time so clients have enough time to know the final states. - safeToReportTerminationToUser.set(true); + if (isLastAMRetry) { + // Except ClientService, other services are already stopped, it is safe to + // let clients know the final states. ClientService should wait for some + // time so clients have enough time to know the final states. + safeToReportTerminationToUser.set(true); + + // Send job-end notification when it is safe to report termination to + // users and it is the last AM retry + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); + } + } + } + try { Thread.sleep(5000); } catch (InterruptedException e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 19eddf3c82c..6241df905fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -128,6 +128,8 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; + /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java index 9f755713173..bd8baf400fc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java @@ -18,19 +18,41 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.net.Proxy; +import java.net.URI; +import java.net.URISyntaxException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; /** * Tests job end notification * */ +@SuppressWarnings("unchecked") public class TestJobEndNotifier extends JobEndNotifier { //Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS @@ -133,7 +155,7 @@ public class TestJobEndNotifier extends JobEndNotifier { public void testNotifyRetries() throws InterruptedException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); - JobReport jobReport = Mockito.mock(JobReport.class); + JobReport jobReport = mock(JobReport.class); long startTime = System.currentTimeMillis(); this.notificationCount = 0; @@ -162,4 +184,100 @@ public class TestJobEndNotifier extends JobEndNotifier { } + @Test + public void testNotificationOnNormalShutdown() throws Exception { + HttpServer server = startHttpServer(); + // Act like it is the second attempt. Default max attempts is 2 + MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2)); + // Make use of safeToReportflag so that we can look at final job-state as + // seen by real users. + app.safeToReportTerminationToUser.set(false); + doNothing().when(app).sysexit(); + Configuration conf = new Configuration(); + conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, + JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobImpl job = (JobImpl)app.submit(conf); + // Even though auto-complete is true, because app is not shut-down yet, user + // will only see RUNNING state. + app.waitForInternalState(job, JobStateInternal.SUCCEEDED); + app.waitForState(job, JobState.RUNNING); + // Now shutdown. User should see SUCCEEDED state. + app.shutDownJob(); + app.waitForState(job, JobState.SUCCEEDED); + Assert.assertEquals(true, app.isLastAMRetry()); + Assert.assertEquals(1, JobEndServlet.calledTimes); + Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", + JobEndServlet.requestUri.getQuery()); + Assert.assertEquals(JobState.SUCCEEDED.toString(), + JobEndServlet.foundJobState); + server.stop(); + } + + @Test + public void testNotificationOnNonLastRetryShutdown() throws Exception { + HttpServer server = startHttpServer(); + MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true)); + doNothing().when(app).sysexit(); + // Make use of safeToReportflag so that we can look at final job-state as + // seen by real users. + app.safeToReportTerminationToUser.set(false); + Configuration conf = new Configuration(); + conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, + JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.RUNNING); + app.getContext().getEventHandler() + .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); + app.waitForInternalState(job, JobStateInternal.REBOOT); + // Not the last AM attempt. So user should that the job is still running. + app.waitForState(job, JobState.RUNNING); + app.shutDownJob(); + Assert.assertEquals(false, app.isLastAMRetry()); + Assert.assertEquals(0, JobEndServlet.calledTimes); + Assert.assertEquals(null, JobEndServlet.requestUri); + Assert.assertEquals(null, JobEndServlet.foundJobState); + server.stop(); + } + + private static HttpServer startHttpServer() throws Exception { + new File(System.getProperty( + "build.webapps", "build/webapps") + "/test").mkdirs(); + HttpServer server = new HttpServer.Builder().setName("test") + .setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build(); + server.addServlet("jobend", "/jobend", JobEndServlet.class); + server.start(); + + JobEndServlet.calledTimes = 0; + JobEndServlet.requestUri = null; + JobEndServlet.baseUrl = "http://localhost:" + server.getPort() + "/"; + JobEndServlet.foundJobState = null; + return server; + } + + @SuppressWarnings("serial") + public static class JobEndServlet extends HttpServlet { + public static volatile int calledTimes = 0; + public static URI requestUri; + public static String baseUrl; + public static String foundJobState; + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + InputStreamReader in = new InputStreamReader(request.getInputStream()); + PrintStream out = new PrintStream(response.getOutputStream()); + + calledTimes++; + try { + requestUri = new URI(null, null, + request.getRequestURI(), request.getQueryString(), null); + foundJobState = request.getParameter("status"); + } catch (URISyntaxException e) { + } + + in.close(); + out.close(); + } + } + }