YARN-9438. launchTime not written to state store for running applications

(cherry picked from commit 9568656cd21d9c02168e18ce35c6726077bbf3a1)
This commit is contained in:
Jonathan Hung 2019-08-27 15:35:17 -07:00
parent 2d8799f4bc
commit f36ccf0ac1
5 changed files with 42 additions and 8 deletions

View File

@ -695,6 +695,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
app.getStartTime(), app.getApplicationSubmissionContext(), app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser(), app.getCallerContext()); app.getUser(), app.getCallerContext());
appState.setApplicationTimeouts(currentExpireTimeouts); appState.setApplicationTimeouts(currentExpireTimeouts);
appState.setLaunchTime(app.getLaunchTime());
// update to state store. Though it synchronous call, update via future to // update to state store. Though it synchronous call, update via future to
// know any exception has been set. It is required because in non-HA mode, // know any exception has been set. It is required because in non-HA mode,
@ -820,6 +821,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
app.getApplicationSubmissionContext(), app.getUser(), app.getApplicationSubmissionContext(), app.getUser(),
app.getCallerContext()); app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts()); appState.setApplicationTimeouts(app.getApplicationTimeouts());
appState.setLaunchTime(app.getLaunchTime());
rmContext.getStateStore().updateApplicationStateSynchronously(appState, rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, future); false, future);

View File

@ -1032,6 +1032,12 @@ public class RMAppImpl implements RMApp, Recoverable {
app.getApplicationId()+", attemptId: "+ app.getApplicationId()+", attemptId: "+
app.getCurrentAppAttempt().getAppAttemptId()+ app.getCurrentAppAttempt().getAppAttemptId()+
"launchTime: "+event.getTimestamp()); "launchTime: "+event.getTimestamp());
ApplicationStateData appState = ApplicationStateData.newInstance(
app.submitTime, app.startTime, app.submissionContext, app.user,
app.callerContext);
appState.setApplicationTimeouts(app.getApplicationTimeouts());
appState.setLaunchTime(event.getTimestamp());
app.rmContext.getStateStore().updateApplicationState(appState);
app.launchTime = event.getTimestamp(); app.launchTime = event.getTimestamp();
} }
} }

View File

@ -2704,6 +2704,7 @@ public class CapacityScheduler extends
rmApp.getApplicationSubmissionContext(), rmApp.getUser(), rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
rmApp.getCallerContext()); rmApp.getCallerContext());
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
appState.setLaunchTime(rmApp.getLaunchTime());
rmContext.getStateStore().updateApplicationStateSynchronously(appState, rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, future); false, future);

View File

@ -754,13 +754,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
@Override @Override
public void updateApplicationStateInternal(ApplicationId appId, public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateData) throws Exception { ApplicationStateData appStateData) throws Exception {
if (count == 0) { if (count == 1) {
// do nothing; simulate app final state is not saved. // Application state is updated on attempt launch.
// After that, do nothing; simulate app final state is not saved.
LOG.info(appId + " final state is not saved."); LOG.info(appId + " final state is not saved.");
count++;
} else { } else {
super.updateApplicationStateInternal(appId, appStateData); super.updateApplicationStateInternal(appId, appStateData);
} }
count++;
} }
}; };
memStore.init(conf); memStore.init(conf);
@ -774,7 +775,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120); MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
RMApp app0 = rm1.submitApp(200); RMApp app0 = rm1.submitApp(200);
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
FinishApplicationMasterRequest req = FinishApplicationMasterRequest req =
FinishApplicationMasterRequest.newInstance( FinishApplicationMasterRequest.newInstance(
FinalApplicationStatus.SUCCEEDED, "", ""); FinalApplicationStatus.SUCCEEDED, "", "");
@ -1797,8 +1797,11 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt); // count = 1 on storing RMApp launchTime
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp); // count = 2 on storing attempt state on kill
// count = 3 on storing app state on kill
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateAttempt);
Assert.assertEquals(3, ((TestMemoryRMStateStore) memStore).updateApp);
} }
// Test Application that fails on submission is saved in state store. // Test Application that fails on submission is saved in state store.
@ -2547,8 +2550,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// create an app and finish the app. // create an app and finish the app.
RMApp app0 = rm1.submitApp(200); RMApp app0 = rm1.submitApp(200);
ApplicationStateData app0State = memStore.getState().getApplicationState()
.get(app0.getApplicationId());
MockAM am0 = launchAndFailAM(app0, rm1, nm1); MockAM am0 = launchAndFailAM(app0, rm1, nm1);
MockAM am1 = launchAndFailAM(app0, rm1, nm1); MockAM am1 = launchAndFailAM(app0, rm1, nm1);
@ -2557,6 +2558,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// am1 is missed from MemoryRMStateStore // am1 is missed from MemoryRMStateStore
memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId()); memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
ApplicationStateData app0State = memStore.getState().getApplicationState()
.get(app0.getApplicationId());
ApplicationAttemptStateData am2State = app0State.getAttempt( ApplicationAttemptStateData am2State = app0State.getAttempt(
am2.getApplicationAttemptId()); am2.getApplicationAttemptId());
// am2's state is not consistent: MemoryRMStateStore just saved its initial // am2's state is not consistent: MemoryRMStateStore just saved its initial

View File

@ -469,6 +469,13 @@ public class TestRMAppTransitions {
any(ApplicationStateData.class)); any(ApplicationStateData.class));
} }
private void assertAppStateLaunchTimeSaved(long expectedLaunchTime) {
ArgumentCaptor<ApplicationStateData> state =
ArgumentCaptor.forClass(ApplicationStateData.class);
verify(store, times(1)).updateApplicationState(state.capture());
assertEquals(expectedLaunchTime, state.getValue().getLaunchTime());
}
private void assertKilled(RMApp application) { private void assertKilled(RMApp application) {
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application); assertAppState(RMAppState.KILLED, application);
@ -902,6 +909,21 @@ public class TestRMAppTransitions {
verifyRMAppFieldsForFinalTransitions(application); verifyRMAppFieldsForFinalTransitions(application);
} }
@Test
public void testAppAcceptedAccepted() throws IOException {
LOG.info("--- START: testAppAcceptedAccepted ---");
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => ACCEPTED event RMAppEventType.ATTEMPT_LAUNCHED
RMAppEvent appAttemptLaunched =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_LAUNCHED, 1234L);
application.handle(appAttemptLaunched);
rmDispatcher.await();
assertAppState(RMAppState.ACCEPTED, application);
assertAppStateLaunchTimeSaved(1234L);
}
@Test @Test
public void testAppAcceptedAttemptKilled() throws IOException, public void testAppAcceptedAttemptKilled() throws IOException,
InterruptedException { InterruptedException {