YARN-9438. launchTime not written to state store for running applications
(cherry picked from commit 9568656cd21d9c02168e18ce35c6726077bbf3a1) (cherry picked from commit 0c498de6e87c6bdc959afa31deb03d0907e0e1a1)
This commit is contained in:
parent
79364a9c53
commit
f73842780e
|
@ -650,6 +650,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
app.getStartTime(), app.getApplicationSubmissionContext(),
|
app.getStartTime(), app.getApplicationSubmissionContext(),
|
||||||
app.getUser(), app.getCallerContext());
|
app.getUser(), app.getCallerContext());
|
||||||
appState.setApplicationTimeouts(currentExpireTimeouts);
|
appState.setApplicationTimeouts(currentExpireTimeouts);
|
||||||
|
appState.setLaunchTime(app.getLaunchTime());
|
||||||
|
|
||||||
// update to state store. Though it synchronous call, update via future to
|
// update to state store. Though it synchronous call, update via future to
|
||||||
// know any exception has been set. It is required because in non-HA mode,
|
// know any exception has been set. It is required because in non-HA mode,
|
||||||
|
@ -775,6 +776,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
||||||
app.getApplicationSubmissionContext(), app.getUser(),
|
app.getApplicationSubmissionContext(), app.getUser(),
|
||||||
app.getCallerContext());
|
app.getCallerContext());
|
||||||
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
||||||
|
appState.setLaunchTime(app.getLaunchTime());
|
||||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
||||||
false, future);
|
false, future);
|
||||||
|
|
||||||
|
|
|
@ -1031,6 +1031,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
app.getApplicationId()+", attemptId: "+
|
app.getApplicationId()+", attemptId: "+
|
||||||
app.getCurrentAppAttempt().getAppAttemptId()+
|
app.getCurrentAppAttempt().getAppAttemptId()+
|
||||||
"launchTime: "+event.getTimestamp());
|
"launchTime: "+event.getTimestamp());
|
||||||
|
ApplicationStateData appState = ApplicationStateData.newInstance(
|
||||||
|
app.submitTime, app.startTime, app.submissionContext, app.user,
|
||||||
|
app.callerContext);
|
||||||
|
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
||||||
|
appState.setLaunchTime(event.getTimestamp());
|
||||||
|
app.rmContext.getStateStore().updateApplicationState(appState);
|
||||||
app.launchTime = event.getTimestamp();
|
app.launchTime = event.getTimestamp();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2563,6 +2563,7 @@ public class CapacityScheduler extends
|
||||||
rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
|
rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
|
||||||
rmApp.getCallerContext());
|
rmApp.getCallerContext());
|
||||||
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
|
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
|
||||||
|
appState.setLaunchTime(rmApp.getLaunchTime());
|
||||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
||||||
false, future);
|
false, future);
|
||||||
|
|
||||||
|
|
|
@ -754,13 +754,14 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationStateInternal(ApplicationId appId,
|
public void updateApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateData appStateData) throws Exception {
|
ApplicationStateData appStateData) throws Exception {
|
||||||
if (count == 0) {
|
if (count == 1) {
|
||||||
// do nothing; simulate app final state is not saved.
|
// Application state is updated on attempt launch.
|
||||||
|
// After that, do nothing; simulate app final state is not saved.
|
||||||
LOG.info(appId + " final state is not saved.");
|
LOG.info(appId + " final state is not saved.");
|
||||||
count++;
|
|
||||||
} else {
|
} else {
|
||||||
super.updateApplicationStateInternal(appId, appStateData);
|
super.updateApplicationStateInternal(appId, appStateData);
|
||||||
}
|
}
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
|
@ -774,7 +775,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
||||||
RMApp app0 = rm1.submitApp(200);
|
RMApp app0 = rm1.submitApp(200);
|
||||||
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
||||||
|
|
||||||
FinishApplicationMasterRequest req =
|
FinishApplicationMasterRequest req =
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||||
|
@ -1797,8 +1797,11 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
||||||
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
|
// count = 1 on storing RMApp launchTime
|
||||||
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
|
// count = 2 on storing attempt state on kill
|
||||||
|
// count = 3 on storing app state on kill
|
||||||
|
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateAttempt);
|
||||||
|
Assert.assertEquals(3, ((TestMemoryRMStateStore) memStore).updateApp);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test Application that fails on submission is saved in state store.
|
// Test Application that fails on submission is saved in state store.
|
||||||
|
@ -2547,8 +2550,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
// create an app and finish the app.
|
// create an app and finish the app.
|
||||||
RMApp app0 = rm1.submitApp(200);
|
RMApp app0 = rm1.submitApp(200);
|
||||||
ApplicationStateData app0State = memStore.getState().getApplicationState()
|
|
||||||
.get(app0.getApplicationId());
|
|
||||||
|
|
||||||
MockAM am0 = launchAndFailAM(app0, rm1, nm1);
|
MockAM am0 = launchAndFailAM(app0, rm1, nm1);
|
||||||
MockAM am1 = launchAndFailAM(app0, rm1, nm1);
|
MockAM am1 = launchAndFailAM(app0, rm1, nm1);
|
||||||
|
@ -2557,6 +2558,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
// am1 is missed from MemoryRMStateStore
|
// am1 is missed from MemoryRMStateStore
|
||||||
memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
|
memStore.removeApplicationAttemptInternal(am1.getApplicationAttemptId());
|
||||||
|
ApplicationStateData app0State = memStore.getState().getApplicationState()
|
||||||
|
.get(app0.getApplicationId());
|
||||||
ApplicationAttemptStateData am2State = app0State.getAttempt(
|
ApplicationAttemptStateData am2State = app0State.getAttempt(
|
||||||
am2.getApplicationAttemptId());
|
am2.getApplicationAttemptId());
|
||||||
// am2's state is not consistent: MemoryRMStateStore just saved its initial
|
// am2's state is not consistent: MemoryRMStateStore just saved its initial
|
||||||
|
|
|
@ -469,6 +469,13 @@ public class TestRMAppTransitions {
|
||||||
any(ApplicationStateData.class));
|
any(ApplicationStateData.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertAppStateLaunchTimeSaved(long expectedLaunchTime) {
|
||||||
|
ArgumentCaptor<ApplicationStateData> state =
|
||||||
|
ArgumentCaptor.forClass(ApplicationStateData.class);
|
||||||
|
verify(store, times(1)).updateApplicationState(state.capture());
|
||||||
|
assertEquals(expectedLaunchTime, state.getValue().getLaunchTime());
|
||||||
|
}
|
||||||
|
|
||||||
private void assertKilled(RMApp application) {
|
private void assertKilled(RMApp application) {
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
@ -902,6 +909,21 @@ public class TestRMAppTransitions {
|
||||||
verifyRMAppFieldsForFinalTransitions(application);
|
verifyRMAppFieldsForFinalTransitions(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppAcceptedAccepted() throws IOException {
|
||||||
|
LOG.info("--- START: testAppAcceptedAccepted ---");
|
||||||
|
|
||||||
|
RMApp application = testCreateAppAccepted(null);
|
||||||
|
// ACCEPTED => ACCEPTED event RMAppEventType.ATTEMPT_LAUNCHED
|
||||||
|
RMAppEvent appAttemptLaunched =
|
||||||
|
new RMAppEvent(application.getApplicationId(),
|
||||||
|
RMAppEventType.ATTEMPT_LAUNCHED, 1234L);
|
||||||
|
application.handle(appAttemptLaunched);
|
||||||
|
rmDispatcher.await();
|
||||||
|
assertAppState(RMAppState.ACCEPTED, application);
|
||||||
|
assertAppStateLaunchTimeSaved(1234L);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppAcceptedAttemptKilled() throws IOException,
|
public void testAppAcceptedAttemptKilled() throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue