YARN-5476. Non existent application reported as ACCEPTED by YarnClientImpl (Junping Du via Varun Saxena)
This commit is contained in:
parent
9019606b69
commit
23c6e3c4e4
|
@ -902,8 +902,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
//TODO recover collector address.
|
||||
//this.collectorAddr = appState.getCollectorAddr();
|
||||
|
||||
// send the ATS create Event
|
||||
sendATSCreateEvent(this, this.startTime);
|
||||
// send the ATS create Event during RM recovery.
|
||||
// NOTE: it could be duplicated with events sent before RM get restarted.
|
||||
sendATSCreateEvent();
|
||||
RMAppAttemptImpl preAttempt = null;
|
||||
for (ApplicationAttemptId attemptId :
|
||||
new TreeSet<>(appState.attempts.keySet())) {
|
||||
|
@ -1134,6 +1135,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
app.handler.handle(new AppAddedSchedulerEvent(app.user,
|
||||
app.submissionContext, false));
|
||||
// send the ATS create Event
|
||||
app.sendATSCreateEvent();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1212,9 +1215,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// communication
|
||||
LOG.info("Storing application with id " + app.applicationId);
|
||||
app.rmContext.getStateStore().storeNewApplication(app);
|
||||
|
||||
// send the ATS create Event
|
||||
app.sendATSCreateEvent(app, app.startTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1912,9 +1912,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
return callerContext;
|
||||
}
|
||||
|
||||
private void sendATSCreateEvent(RMApp app, long startTime) {
|
||||
rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
|
||||
rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
|
||||
private void sendATSCreateEvent() {
|
||||
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
|
||||
rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
|
||||
}
|
||||
|
||||
@Private
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
@ -373,25 +374,32 @@ public class TestRMAppTransitions {
|
|||
|
||||
protected RMApp testCreateAppNewSaving(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = createNewTestApp(submissionContext);
|
||||
RMApp application = createNewTestApp(submissionContext);
|
||||
// NEW => NEW_SAVING event RMAppEventType.START
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
|
||||
application.handle(event);
|
||||
assertStartTimeSet(application);
|
||||
assertAppState(RMAppState.NEW_SAVING, application);
|
||||
// verify sendATSCreateEvent() is not get called during
|
||||
// RMAppNewlySavingTransition.
|
||||
verify(publisher, times(0)).appCreated(eq(application), anyLong());
|
||||
return application;
|
||||
}
|
||||
|
||||
protected RMApp testCreateAppSubmittedNoRecovery(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = testCreateAppNewSaving(submissionContext);
|
||||
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
|
||||
RMApp application = testCreateAppNewSaving(submissionContext);
|
||||
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
|
||||
new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.APP_NEW_SAVED);
|
||||
application.handle(event);
|
||||
assertStartTimeSet(application);
|
||||
assertAppState(RMAppState.SUBMITTED, application);
|
||||
// verify sendATSCreateEvent() is get called during
|
||||
// AddApplicationToSchedulerTransition.
|
||||
verify(publisher).appCreated(eq(application), anyLong());
|
||||
return application;
|
||||
}
|
||||
|
||||
|
@ -406,7 +414,6 @@ public class TestRMAppTransitions {
|
|||
RMAppEvent event =
|
||||
new RMAppRecoverEvent(application.getApplicationId(), state);
|
||||
|
||||
|
||||
application.handle(event);
|
||||
assertStartTimeSet(application);
|
||||
assertAppState(RMAppState.SUBMITTED, application);
|
||||
|
@ -416,7 +423,7 @@ public class TestRMAppTransitions {
|
|||
protected RMApp testCreateAppAccepted(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = testCreateAppSubmittedNoRecovery(submissionContext);
|
||||
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
|
||||
// SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(),
|
||||
RMAppEventType.APP_ACCEPTED);
|
||||
|
@ -428,7 +435,7 @@ public class TestRMAppTransitions {
|
|||
|
||||
protected RMApp testCreateAppRunning(
|
||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||
RMApp application = testCreateAppAccepted(submissionContext);
|
||||
RMApp application = testCreateAppAccepted(submissionContext);
|
||||
// ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(application.getApplicationId(),
|
||||
|
|
Loading…
Reference in New Issue