diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 971b1afb94d..5fe348b3584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -351,8 +351,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMAppAttemptState.FAILED)) // Transitions from RUNNING State - .addTransition(RMAppAttemptState.RUNNING, - EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED), + .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) @@ -1711,25 +1710,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } } - private static final class AMUnregisteredTransition implements - MultipleArcTransition { + private static final class AMUnregisteredTransition extends BaseTransition { @Override - public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, + public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { // Tell the app if (appAttempt.getSubmissionContext().getUnmanagedAM()) { + // YARN-1815: Saving the attempt final state so that we do not recover + // the finished Unmanaged AM post RM failover // Unmanaged AMs have no container to wait for, so they skip // the FINISHING state and go straight to FINISHED. - appAttempt.updateInfoOnAMUnregister(event); - new FinalTransition(RMAppAttemptState.FINISHED).transition( - appAttempt, event); - return RMAppAttemptState.FINISHED; + appAttempt.rememberTargetTransitionsAndStoreState(event, + new AMFinishedAfterFinalSavingTransition(event), + RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED); + } else { + // Saving the attempt final state + appAttempt.rememberTargetTransitionsAndStoreState(event, + new FinalStateSavedAfterAMUnregisterTransition(), + RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED); } - // Saving the attempt final state - appAttempt.rememberTargetTransitionsAndStoreState(event, - new FinalStateSavedAfterAMUnregisterTransition(), - RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED); ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId(); @@ -1740,7 +1740,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { // AppAttempt to App after this point of time is AM/AppAttempt Finished. appAttempt.eventHandler.handle(new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED)); - return RMAppAttemptState.FINAL_SAVING; + return; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 851a8a8dab9..358802b6ebd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -374,14 +374,6 @@ public abstract class AbstractYarnScheduler continue; } - // Unmanaged AM recovery is addressed in YARN-1815 - if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { - LOG.info("Skip recovering container " + container + " for unmanaged AM." - + rmApp.getApplicationId()); - killOrphanContainerOnNode(nm, container); - continue; - } - SchedulerApplication schedulerApp = applications.get(appId); if (schedulerApp == null) { LOG.info("Skip recovering container " + container diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index c5e6994d749..28406133027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -874,6 +874,20 @@ public class MockRM extends ResourceManager { return am; } + public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + // UAMs go directly to LAUNCHED state + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); + System.out.println("Launch AM " + attempt.getAppAttemptId()); + nm.nodeHeartbeat(true); + MockAM am = new MockAM(rm.getRMContext(), rm.masterService, + attempt.getAppAttemptId()); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + return am; + } + public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) throws Exception { rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 05713877c25..57728061793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1408,4 +1408,96 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // check that attempt state is recovered correctly. assertEquals(RMAppAttemptState.FINISHED, recoveredApp1.getCurrentAppAttempt().getState()); } + + @Test(timeout = 600000) + public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the UAM + RMApp app0 = rm1.submitApp(200, true); + MockAM am0 = MockRM.launchUAM(app0, rm1, nm1); + am0.registerAppAttempt(); + + // Allocate containers to UAM + int numContainers = 2; + am0.allocate("127.0.0.1", 1000, numContainers, + new ArrayList()); + nm1.nodeHeartbeat(true); + List conts = am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + Assert.assertTrue(conts.isEmpty()); + while (conts.size() == 0) { + nm1.nodeHeartbeat(true); + conts.addAll(am0.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(500); + } + Assert.assertFalse(conts.isEmpty()); + + // start new RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); + + // recover app + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app0.getApplicationId()); + NMContainerStatus container1 = TestRMRestart + .createNMContainerStatus(am0.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus container2 = TestRMRestart + .createNMContainerStatus(am0.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + nm1.registerNode(Arrays.asList(container1, container2), null); + + // Wait for RM to settle down on recovering containers; + waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); + + // retry registerApplicationMaster() after RM restart. + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); + + // Check if UAM is correctly recovered on restart + rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // Check if containers allocated to UAM are recovered + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = + schedulerApps.get(recoveredApp.getApplicationId()); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + Assert.assertEquals(numContainers, + schedulerAttempt.getLiveContainers().size()); + + // Check if UAM is able to heart beat + Assert.assertNotNull(am0.doHeartbeat()); + + // Complete the UAM + am0.unregisterAppAttempt(false); + rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHED); + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, + recoveredApp.getFinalApplicationStatus()); + + // Restart RM once more to check UAM is not re-run + MockRM rm3 = new MockRM(conf, memStore); + rm3.start(); + recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId()); + Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 93f17e7fa22..3143b9479b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -596,8 +596,8 @@ public class TestRMAppAttemptTransitions { } else { assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); - verifyAttemptFinalStateSaved(); } + verifyAttemptFinalStateSaved(); assertEquals(finishedContainerCount, applicationAttempt .getJustFinishedContainers().size()); Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) @@ -735,6 +735,7 @@ public class TestRMAppAttemptTransitions { applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( applicationAttempt.getAppAttemptId(), url, finalStatus, diagnostics)); + sendAttemptUpdateSavedEvent(applicationAttempt); testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1, true); assertFalse(transferStateFromPreviousAttempt);