YARN-5476. Non existent application reported as ACCEPTED by YarnClientImpl (Junping Du via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-08-13 00:19:00 +05:30
parent b63d3dcac9
commit 918a0f12fa
2 changed files with 22 additions and 15 deletions

View File

@ -833,8 +833,9 @@ public class RMAppImpl implements RMApp, Recoverable {
this.nextAttemptId = firstAttemptIdInStateStore;
}
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
// send the ATS create Event during RM recovery.
// NOTE: it could be duplicated with events sent before RM get restarted.
sendATSCreateEvent();
RMAppAttemptImpl preAttempt = null;
for (ApplicationAttemptId attemptId :
@ -1049,6 +1050,8 @@ public class RMAppImpl implements RMApp, Recoverable {
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext, false));
// send the ATS create Event
app.sendATSCreateEvent();
}
}
@ -1127,9 +1130,6 @@ public class RMAppImpl implements RMApp, Recoverable {
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
// send the ATS create Event
app.sendATSCreateEvent(app, app.startTime);
}
}
@ -1827,9 +1827,9 @@ public class RMAppImpl implements RMApp, Recoverable {
return callerContext;
}
private void sendATSCreateEvent(RMApp app, long startTime) {
rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
private void sendATSCreateEvent() {
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
}
@Private

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@ -376,18 +377,25 @@ public class TestRMAppTransitions {
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.NEW_SAVING, application);
// verify sendATSCreateEvent() is not get called during
// RMAppNewlySavingTransition.
verify(publisher, times(0)).appCreated(eq(application), anyLong());
return application;
}
protected RMApp testCreateAppSubmittedNoRecovery(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = testCreateAppNewSaving(submissionContext);
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
// NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED);
new RMAppEvent(application.getApplicationId(),
RMAppEventType.APP_NEW_SAVED);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);
// verify sendATSCreateEvent() is get called during
// AddApplicationToSchedulerTransition.
verify(publisher).appCreated(eq(application), anyLong());
return application;
}
@ -402,7 +410,6 @@ public class TestRMAppTransitions {
RMAppEvent event =
new RMAppRecoverEvent(application.getApplicationId(), state);
application.handle(event);
assertStartTimeSet(application);
assertAppState(RMAppState.SUBMITTED, application);