From a5ecb4de4192b349fb14a162b00c1bfb756948c3 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Wed, 25 Sep 2013 00:53:10 +0000 Subject: [PATCH] Merge r1526071 from trunk to branch-2 for MAPREDUCE-5505. Clients should be notified job finished only after job successfully unregistered (Zhijie Shen via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1526072 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/AppContext.java | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 16 ++++++- .../mapreduce/v2/app/job/impl/JobImpl.java | 21 +++++++++- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 42 +++++++++++++++++-- .../mapreduce/v2/app/MockAppContext.java | 7 ++++ .../hadoop/mapreduce/v2/app/TestMRApp.java | 14 +++++++ .../v2/app/TestRuntimeEstimators.java | 7 ++++ .../v2/app/job/impl/TestJobImpl.java | 28 +++++++++---- .../hadoop/mapreduce/v2/hs/JobHistory.java | 7 ++++ 10 files changed, 134 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 250f032b019..5be017dd23b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -79,6 +79,9 @@ Release 2.1.2 - UNRELEASED needs to set up its own certificates etc and not depend on clusters'. (Omkar Vinit Joshi via vinodkv) + MAPREDUCE-5505. Clients should be notified job finished only after job + successfully unregistered (Zhijie Shen via bikas) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 885534313b0..36482aebe31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -63,4 +63,7 @@ public interface AppContext { ClientToAMTokenSecretManager getClientToAMTokenSecretManager(); boolean isLastAMRetry(); + + boolean safeToReportTerminationToUser(); + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index e2afdce6d13..230719511d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -209,6 +210,10 @@ public class MRAppMaster extends CompositeService { private long recoveredJobStartTime = 0; + @VisibleForTesting + protected AtomicBoolean safeToReportTerminationToUser = + new AtomicBoolean(false); + public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, long appSubmitTime, int maxAppAttempts) { @@ -554,8 +559,10 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); MRAppMaster.this.stop(); - // TODO: Stop ClientService last, since only ClientService should wait for - // some time so clients can know the final states. Will be removed once RM come on. + // Except ClientService, other services are already stopped, it is safe to + // let clients know the final states. ClientService should wait for some + // time so clients have enough time to know the final states. + safeToReportTerminationToUser.set(true); try { Thread.sleep(5000); } catch (InterruptedException e) { @@ -964,6 +971,11 @@ public class MRAppMaster extends CompositeService { public boolean isLastAMRetry(){ return isLastAMRetry; } + + @Override + public boolean safeToReportTerminationToUser() { + return safeToReportTerminationToUser.get(); + } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index a146a0cee4b..19eddf3c82c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -641,6 +641,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private ScheduledFuture failWaitTriggerScheduledFuture; + private JobState lastNonFinalState = JobState.NEW; + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -928,7 +930,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, public JobState getState() { readLock.lock(); try { - return getExternalState(getInternalState()); + JobState state = getExternalState(getInternalState()); + if (!appContext.safeToReportTerminationToUser() + && (state == JobState.SUCCEEDED || state == JobState.FAILED + || state == JobState.KILLED || state == JobState.ERROR)) { + return lastNonFinalState; + } else { + return state; + } } finally { readLock.unlock(); } @@ -972,6 +981,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, if (oldState != getInternalState()) { LOG.info(jobId + "Job Transitioned from " + oldState + " to " + getInternalState()); + rememberLastNonFinalState(oldState); } } @@ -980,6 +990,15 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } } + private void rememberLastNonFinalState(JobStateInternal stateInternal) { + JobState state = getExternalState(stateInternal); + // if state is not the final state, set lastNonFinalState + if (state != JobState.SUCCEEDED && state != JobState.FAILED + && state != JobState.KILLED && state != JobState.ERROR) { + lastNonFinalState = state; + } + } + @Private public JobStateInternal getInternalState() { readLock.lock(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 76fd00ad848..2a009955e3d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -135,11 +135,22 @@ public class MRApp extends MRAppMaster { this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock); } + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, Clock clock, boolean shutdown) { + this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock, + shutdown); + } + public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { this(maps, reduces, autoComplete, testName, cleanOnStart, 1); } - + + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, boolean shutdown) { + this(maps, reduces, autoComplete, testName, cleanOnStart, 1, shutdown); + } + @Override protected void initJobCredentialsAndUGI(Configuration conf) { // Fake a shuffle secret that normally is provided by the job client. @@ -169,23 +180,43 @@ public class MRApp extends MRAppMaster { new SystemClock()); } + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, int startCount, boolean shutdown) { + this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, + new SystemClock(), shutdown); + } + + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { + this(getApplicationAttemptId(applicationId, startCount), getContainerId( + applicationId, startCount), maps, reduces, autoComplete, testName, + cleanOnStart, startCount, clock, shutdown); + } + public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount, Clock clock) { this(getApplicationAttemptId(applicationId, startCount), getContainerId( applicationId, startCount), maps, reduces, autoComplete, testName, - cleanOnStart, startCount, clock); + cleanOnStart, startCount, clock, true); + } + + public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, + int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, int startCount, boolean shutdown) { + this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, + cleanOnStart, startCount, new SystemClock(), shutdown); } public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, - cleanOnStart, startCount, new SystemClock()); + cleanOnStart, startCount, new SystemClock(), true); } public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, int maps, int reduces, boolean autoComplete, String testName, - boolean cleanOnStart, int startCount, Clock clock) { + boolean cleanOnStart, int startCount, Clock clock, boolean shutdown) { super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); this.testWorkDir = new File("target", testName); @@ -204,6 +235,9 @@ public class MRApp extends MRAppMaster { this.maps = maps; this.reduces = reduces; this.autoComplete = autoComplete; + // If safeToReportTerminationToUser is set to true, we can verify whether + // the job can reaches the final state when MRAppMaster shuts down. + this.safeToReportTerminationToUser.set(shutdown); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 02a3209f594..0496072986e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -135,4 +135,11 @@ public class MockAppContext implements AppContext { public boolean isLastAMRetry() { return false; } + + @Override + public boolean safeToReportTerminationToUser() { + // bogus - Not Required + return true; + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 53b5aaa5c89..1987d706139 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -374,6 +375,19 @@ public class TestMRApp { app.waitForState(job, JobState.ERROR); } + @SuppressWarnings("resource") + @Test + public void testJobSuccess() throws Exception { + MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true, false); + JobImpl job = (JobImpl) app.submit(new Configuration()); + app.waitForInternalState(job, JobStateInternal.SUCCEEDED); + // AM is not unregistered + Assert.assertEquals(JobState.RUNNING, job.getState()); + // imitate that AM is unregistered + app.safeToReportTerminationToUser.set(true); + app.waitForState(job, JobState.SUCCEEDED); + } + @Test public void testJobRebootNotLastRetry() throws Exception { MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 8ba57bbc248..7f968ca70f6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -867,5 +867,12 @@ public class TestRuntimeEstimators { public boolean isLastAMRetry() { return false; } + + @Override + public boolean safeToReportTerminationToUser() { + // bogus - Not Required + return true; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 72fc4f4471e..8fb7f1a7b93 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -275,6 +275,7 @@ public class TestJobImpl { AppContext mockContext = mock(AppContext.class); when(mockContext.isLastAMRetry()).thenReturn(true); + when(mockContext.safeToReportTerminationToUser()).thenReturn(false); JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); @@ -282,7 +283,9 @@ public class TestJobImpl { syncBarrier.await(); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); assertJobState(job, JobStateInternal.REBOOT); - // return the external state as FAILED since this is last retry. + // return the external state as ERROR since this is last retry. + Assert.assertEquals(JobState.RUNNING, job.getState()); + when(mockContext.safeToReportTerminationToUser()).thenReturn(true); Assert.assertEquals(JobState.ERROR, job.getState()); dispatcher.stop(); @@ -590,12 +593,14 @@ public class TestJobImpl { final JobDiagnosticsUpdateEvent diagUpdateEvent = new JobDiagnosticsUpdateEvent(jobId, diagMsg); MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + AppContext mockContext = mock(AppContext.class); + when(mockContext.safeToReportTerminationToUser()).thenReturn(true); JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), new Configuration(), mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, null, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, mockContext, null, null); job.handle(diagUpdateEvent); String diagnostics = job.getReport().getDiagnostics(); Assert.assertNotNull(diagnostics); @@ -606,7 +611,7 @@ public class TestJobImpl { mock(EventHandler.class), null, mock(JobTokenSecretManager.class), null, new SystemClock(), null, - mrAppMetrics, null, true, null, 0, null, null, null, null); + mrAppMetrics, null, true, null, 0, null, mockContext, null, null); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(diagUpdateEvent); diagnostics = job.getReport().getDiagnostics(); @@ -699,7 +704,9 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2, null); + AppContext mockContext = mock(AppContext.class); + when(mockContext.safeToReportTerminationToUser()).thenReturn(false); + JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -707,12 +714,15 @@ public class TestJobImpl { assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED)); - Assert.assertEquals(JobState.FAILED, job.getState()); + assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED)); - Assert.assertEquals(JobState.FAILED, job.getState()); + assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED)); - Assert.assertEquals(JobState.FAILED, job.getState()); + assertJobState(job, JobStateInternal.FAILED); job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)); + assertJobState(job, JobStateInternal.FAILED); + Assert.assertEquals(JobState.RUNNING, job.getState()); + when(mockContext.safeToReportTerminationToUser()).thenReturn(true); Assert.assertEquals(JobState.FAILED, job.getState()); dispatcher.stop(); @@ -750,6 +760,10 @@ public class TestJobImpl { Dispatcher dispatcher, int numSplits, AppContext appContext) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); + if (appContext == null) { + appContext = mock(AppContext.class); + when(appContext.safeToReportTerminationToUser()).thenReturn(true); + } StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 4ca4786e9d4..7de35ff319e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -387,4 +387,11 @@ public class JobHistory extends AbstractService implements HistoryContext { // bogus - Not Required return false; } + + @Override + public boolean safeToReportTerminationToUser() { + // bogus - Not Required + return true; + } + }