YARN-1815. Work preserving recovery of Unmanged AMs. Contributed by Subru Krishnan

(cherry picked from commit 097baaaeba)
This commit is contained in:
Jian He 2016-06-03 10:49:30 -07:00
parent b8216c10d8
commit 01a3f7899c
5 changed files with 122 additions and 23 deletions

View File

@ -351,8 +351,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptState.FAILED)) RMAppAttemptState.FAILED))
// Transitions from RUNNING State // Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING, .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
EnumSet.of(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINISHED),
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@ -1711,25 +1710,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
private static final class AMUnregisteredTransition implements private static final class AMUnregisteredTransition extends BaseTransition {
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override @Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
// Tell the app // Tell the app
if (appAttempt.getSubmissionContext().getUnmanagedAM()) { if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
// YARN-1815: Saving the attempt final state so that we do not recover
// the finished Unmanaged AM post RM failover
// Unmanaged AMs have no container to wait for, so they skip // Unmanaged AMs have no container to wait for, so they skip
// the FINISHING state and go straight to FINISHED. // the FINISHING state and go straight to FINISHED.
appAttempt.updateInfoOnAMUnregister(event); appAttempt.rememberTargetTransitionsAndStoreState(event,
new FinalTransition(RMAppAttemptState.FINISHED).transition( new AMFinishedAfterFinalSavingTransition(event),
appAttempt, event); RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED);
return RMAppAttemptState.FINISHED; } else {
// Saving the attempt final state
appAttempt.rememberTargetTransitionsAndStoreState(event,
new FinalStateSavedAfterAMUnregisterTransition(),
RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
} }
// Saving the attempt final state
appAttempt.rememberTargetTransitionsAndStoreState(event,
new FinalStateSavedAfterAMUnregisterTransition(),
RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED);
ApplicationId applicationId = ApplicationId applicationId =
appAttempt.getAppAttemptId().getApplicationId(); appAttempt.getAppAttemptId().getApplicationId();
@ -1740,7 +1740,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// AppAttempt to App after this point of time is AM/AppAttempt Finished. // AppAttempt to App after this point of time is AM/AppAttempt Finished.
appAttempt.eventHandler.handle(new RMAppEvent(applicationId, appAttempt.eventHandler.handle(new RMAppEvent(applicationId,
RMAppEventType.ATTEMPT_UNREGISTERED)); RMAppEventType.ATTEMPT_UNREGISTERED));
return RMAppAttemptState.FINAL_SAVING; return;
} }
} }

View File

@ -374,14 +374,6 @@ public abstract class AbstractYarnScheduler
continue; continue;
} }
// Unmanaged AM recovery is addressed in YARN-1815
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
LOG.info("Skip recovering container " + container + " for unmanaged AM."
+ rmApp.getApplicationId());
killOrphanContainerOnNode(nm, container);
continue;
}
SchedulerApplication<T> schedulerApp = applications.get(appId); SchedulerApplication<T> schedulerApp = applications.get(appId);
if (schedulerApp == null) { if (schedulerApp == null) {
LOG.info("Skip recovering container " + container LOG.info("Skip recovering container " + container

View File

@ -874,6 +874,20 @@ public class MockRM extends ResourceManager {
return am; return am;
} }
public static MockAM launchUAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
// UAMs go directly to LAUNCHED state
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
return am;
}
public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm) public static RMAppAttempt waitForAttemptScheduled(RMApp app, MockRM rm)
throws Exception { throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);

View File

@ -1408,4 +1408,96 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
// check that attempt state is recovered correctly. // check that attempt state is recovered correctly.
assertEquals(RMAppAttemptState.FINISHED, recoveredApp1.getCurrentAppAttempt().getState()); assertEquals(RMAppAttemptState.FINISHED, recoveredApp1.getCurrentAppAttempt().getState());
} }
@Test(timeout = 600000)
public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the UAM
RMApp app0 = rm1.submitApp(200, true);
MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
am0.registerAppAttempt();
// Allocate containers to UAM
int numContainers = 2;
am0.allocate("127.0.0.1", 1000, numContainers,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertTrue(conts.isEmpty());
while (conts.size() == 0) {
nm1.nodeHeartbeat(true);
conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(500);
}
Assert.assertFalse(conts.isEmpty());
// start new RM
rm2 = new MockRM(conf, memStore);
rm2.start();
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
// recover app
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
RMApp recoveredApp =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
NMContainerStatus container1 = TestRMRestart
.createNMContainerStatus(am0.getApplicationAttemptId(), 1,
ContainerState.RUNNING);
NMContainerStatus container2 = TestRMRestart
.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
nm1.registerNode(Arrays.asList(container1, container2), null);
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
// retry registerApplicationMaster() after RM restart.
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am0.registerAppAttempt(true);
// Check if UAM is correctly recovered on restart
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
// Check if containers allocated to UAM are recovered
Map<ApplicationId, SchedulerApplication> schedulerApps =
((AbstractYarnScheduler) rm2.getResourceScheduler())
.getSchedulerApplications();
SchedulerApplication schedulerApp =
schedulerApps.get(recoveredApp.getApplicationId());
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
Assert.assertEquals(numContainers,
schedulerAttempt.getLiveContainers().size());
// Check if UAM is able to heart beat
Assert.assertNotNull(am0.doHeartbeat());
// Complete the UAM
am0.unregisterAppAttempt(false);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
recoveredApp.getFinalApplicationStatus());
// Restart RM once more to check UAM is not re-run
MockRM rm3 = new MockRM(conf, memStore);
rm3.start();
recoveredApp = rm3.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState());
}
} }

View File

@ -596,8 +596,8 @@ public class TestRMAppAttemptTransitions {
} else { } else {
assertEquals(getProxyUrl(applicationAttempt), assertEquals(getProxyUrl(applicationAttempt),
applicationAttempt.getTrackingUrl()); applicationAttempt.getTrackingUrl());
verifyAttemptFinalStateSaved();
} }
verifyAttemptFinalStateSaved();
assertEquals(finishedContainerCount, applicationAttempt assertEquals(finishedContainerCount, applicationAttempt
.getJustFinishedContainers().size()); .getJustFinishedContainers().size());
Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
@ -735,6 +735,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent( applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
applicationAttempt.getAppAttemptId(), url, finalStatus, applicationAttempt.getAppAttemptId(), url, finalStatus,
diagnostics)); diagnostics));
sendAttemptUpdateSavedEvent(applicationAttempt);
testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1, testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
true); true);
assertFalse(transferStateFromPreviousAttempt); assertFalse(transferStateFromPreviousAttempt);