YARN-1816. Fixed ResourceManager to get RMApp correctly handle ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576911 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4de17c6052
commit
197217c95d
|
@ -457,6 +457,10 @@ Release 2.4.0 - UNRELEASED
|
|||
and thus recover app itself synchronously and avoid races with resyncing
|
||||
NodeManagers. (Jian He via vinodkv)
|
||||
|
||||
YARN-1816. Fixed ResourceManager to get RMApp correctly handle
|
||||
ATTEMPT_FINISHED event at ACCEPTED state that can happen after RM restarts.
|
||||
(Jian He via vinodkv)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -189,11 +189,14 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppEventType.ATTEMPT_REGISTERED)
|
||||
.addTransition(RMAppState.ACCEPTED,
|
||||
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
|
||||
// ACCEPTED state is possible to receive ATTEMPT_FAILED event because
|
||||
// RMAppRecoveredTransition is returning ACCEPTED state directly and
|
||||
// waiting for the previous AM to exit.
|
||||
// ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
|
||||
// event because RMAppRecoveredTransition is returning ACCEPTED state
|
||||
// directly and waiting for the previous AM to exit.
|
||||
RMAppEventType.ATTEMPT_FAILED,
|
||||
new AttemptFailedTransition(RMAppState.ACCEPTED))
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
|
||||
RMAppEventType.ATTEMPT_FINISHED,
|
||||
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
|
||||
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
|
||||
RMAppEventType.KILL, new KillAttemptTransition())
|
||||
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
|
||||
|
@ -725,11 +728,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
@Override
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
/*
|
||||
* If last attempt recovered final state is null .. it means attempt was
|
||||
* started but AM container may or may not have started / finished.
|
||||
* Therefore we should wait for it to finish.
|
||||
*/
|
||||
|
||||
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
|
||||
// synchronously recover attempt to ensure any incoming external events
|
||||
// to be processed after the attempt processes the recover event.
|
||||
|
@ -744,6 +743,17 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
return app.recoveredFinalState;
|
||||
}
|
||||
|
||||
// Last attempt is in final state, do not add to scheduler and just return
|
||||
// ACCEPTED waiting for last RMAppAttempt to send finished or failed event
|
||||
// back.
|
||||
if (app.currentAttempt != null
|
||||
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|
||||
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|
||||
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
|
||||
&& app.attempts.size() == app.maxAppAttempts))) {
|
||||
return RMAppState.ACCEPTED;
|
||||
}
|
||||
|
||||
// Notify scheduler about the app on recovery
|
||||
new AddApplicationToSchedulerTransition().transition(app, event);
|
||||
|
||||
|
|
|
@ -872,6 +872,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||
@Override
|
||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
/*
|
||||
* If last attempt recovered final state is null .. it means attempt was
|
||||
* started but AM container may or may not have started / finished.
|
||||
* Therefore we should wait for it to finish.
|
||||
*/
|
||||
if (appAttempt.recoveredFinalState != null) {
|
||||
appAttempt.progress = 1.0f;
|
||||
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
|
||||
|
|
|
@ -601,7 +601,63 @@ public class TestRMRestart {
|
|||
RMAppAttemptState.SCHEDULED);
|
||||
Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
|
||||
.getCurrentAppAttempt().getAppAttemptState());
|
||||
}
|
||||
|
||||
// Test RM restarts after previous attempt succeeded and was saved into state
|
||||
// store but before the RMAppAttempt notifies RMApp that it has succeeded. On
|
||||
// recovery, RMAppAttempt should send the AttemptFinished event to RMApp so
|
||||
// that RMApp can recover its state.
|
||||
@Test
|
||||
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
public void updateApplicationStateInternal(ApplicationId appId,
|
||||
ApplicationStateDataPBImpl appStateData) throws Exception {
|
||||
if (count == 0) {
|
||||
// do nothing; simulate app final state is not saved.
|
||||
LOG.info(appId + " final state is not saved.");
|
||||
count++;
|
||||
} else {
|
||||
super.updateApplicationStateInternal(appId, appStateData);
|
||||
}
|
||||
}
|
||||
};
|
||||
memStore.init(conf);
|
||||
RMState rmState = memStore.getState();
|
||||
Map<ApplicationId, ApplicationState> rmAppState =
|
||||
rmState.getApplicationState();
|
||||
|
||||
// start RM
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
||||
RMApp app0 = rm1.submitApp(200);
|
||||
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||
|
||||
FinishApplicationMasterRequest req =
|
||||
FinishApplicationMasterRequest.newInstance(
|
||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||
am0.unregisterAppAttempt(req, true);
|
||||
am0.waitForState(RMAppAttemptState.FINISHING);
|
||||
// app final state is not saved. This guarantees that RMApp cannot be
|
||||
// recovered via its own saved state, but only via the event notification
|
||||
// from the RMAppAttempt on recovery.
|
||||
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
|
||||
|
||||
// start RM
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
rm2.start();
|
||||
|
||||
rm2.waitForState(app0.getCurrentAppAttempt().getAppAttemptId(),
|
||||
RMAppAttemptState.FINISHED);
|
||||
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
|
||||
// app final state is saved via the finish event from attempt.
|
||||
Assert.assertEquals(RMAppState.FINISHED,
|
||||
rmAppState.get(app0.getApplicationId()).getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue