From 197217c95d5da1235f252962b1e3a24d063e85cf Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 12 Mar 2014 21:18:55 +0000 Subject: [PATCH] YARN-1816. Fixed ResourceManager to get RMApp correctly handle ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576911 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 ++ .../resourcemanager/rmapp/RMAppImpl.java | 26 ++++++--- .../rmapp/attempt/RMAppAttemptImpl.java | 7 ++- .../server/resourcemanager/TestRMRestart.java | 56 +++++++++++++++++++ 4 files changed, 84 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4427a7979b6..f8798cad159 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -457,6 +457,10 @@ Release 2.4.0 - UNRELEASED and thus recover app itself synchronously and avoid races with resyncing NodeManagers. (Jian He via vinodkv) + YARN-1816. Fixed ResourceManager to get RMApp correctly handle + ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts. + (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 8814100ac1b..30af77de1d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -189,11 +189,14 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), - // ACCEPTED state is possible to receive ATTEMPT_FAILED event because - // RMAppRecoveredTransition is returning ACCEPTED state directly and - // waiting for the previous AM to exit. + // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED + // event because RMAppRecoveredTransition is returning ACCEPTED state + // directly and waiting for the previous AM to exit. RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.ACCEPTED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_FINISHED, + new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) // ACCECPTED state can once again receive APP_ACCEPTED event, because on @@ -725,11 +728,7 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - /* - * If last attempt recovered final state is null .. it means attempt was - * started but AM container may or may not have started / finished. - * Therefore we should wait for it to finish. - */ + for (RMAppAttempt attempt : app.getAppAttempts().values()) { // synchronously recover attempt to ensure any incoming external events // to be processed after the attempt processes the recover event. @@ -744,6 +743,17 @@ public class RMAppImpl implements RMApp, Recoverable { return app.recoveredFinalState; } + // Last attempt is in final state, do not add to scheduler and just return + // ACCEPTED waiting for last RMAppAttempt to send finished or failed event + // back. + if (app.currentAttempt != null + && (app.currentAttempt.getState() == RMAppAttemptState.KILLED + || app.currentAttempt.getState() == RMAppAttemptState.FINISHED + || (app.currentAttempt.getState() == RMAppAttemptState.FAILED + && app.attempts.size() == app.maxAppAttempts))) { + return RMAppState.ACCEPTED; + } + // Notify scheduler about the app on recovery new AddApplicationToSchedulerTransition().transition(app, event); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 3b845b1b8b3..6b947891129 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -872,6 +872,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + /* + * If last attempt recovered final state is null .. it means attempt was + * started but AM container may or may not have started / finished. + * Therefore we should wait for it to finish. + */ if (appAttempt.recoveredFinalState != null) { appAttempt.progress = 1.0f; RMApp rmApp =appAttempt.rmContext.getRMApps().get( @@ -1598,7 +1603,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { ExitUtil.terminate(1, storeEvent.getStoredException()); } } - + private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 484f231927d..9bedb34524d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -601,7 +601,63 @@ public class TestRMRestart { RMAppAttemptState.SCHEDULED); Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2 .getCurrentAppAttempt().getAppAttemptState()); + } + // Test RM restarts after previous attempt succeeded and was saved into state + // store but before the RMAppAttempt notifies RMApp that it has succeeded. On + // recovery, RMAppAttempt should send the AttemptFinished event to RMApp so + // that RMApp can recover its state. + @Test + public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + MemoryRMStateStore memStore = new MemoryRMStateStore() { + int count = 0; + + @Override + public void updateApplicationStateInternal(ApplicationId appId, + ApplicationStateDataPBImpl appStateData) throws Exception { + if (count == 0) { + // do nothing; simulate app final state is not saved. + LOG.info(appId + " final state is not saved."); + count++; + } else { + super.updateApplicationStateInternal(appId, appStateData); + } + } + }; + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120); + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + am0.unregisterAppAttempt(req, true); + am0.waitForState(RMAppAttemptState.FINISHING); + // app final state is not saved. This guarantees that RMApp cannot be + // recovered via its own saved state, but only via the event notification + // from the RMAppAttempt on recovery. + Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState()); + + // start RM + MockRM rm2 = new MockRM(conf, memStore); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + rm2.start(); + + rm2.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.FINISHED); + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + // app final state is saved via the finish event from attempt. + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getState()); } @Test