From 23c6e3c4e41fecc61d062542cb61e68898235006 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Fri, 12 Aug 2016 20:37:58 +0530 Subject: [PATCH] YARN-5476. Non existent application reported as ACCEPTED by YarnClientImpl (Junping Du via Varun Saxena) --- .../resourcemanager/rmapp/RMAppImpl.java | 16 +++++++------- .../rmapp/TestRMAppTransitions.java | 21 ++++++++++++------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 82e669abc95..e5bde32b282 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -902,8 +902,9 @@ public class RMAppImpl implements RMApp, Recoverable { //TODO recover collector address. //this.collectorAddr = appState.getCollectorAddr(); - // send the ATS create Event - sendATSCreateEvent(this, this.startTime); + // send the ATS create Event during RM recovery. + // NOTE: it could be duplicated with events sent before RM get restarted. + sendATSCreateEvent(); RMAppAttemptImpl preAttempt = null; for (ApplicationAttemptId attemptId : new TreeSet<>(appState.attempts.keySet())) { @@ -1134,6 +1135,8 @@ public class RMAppImpl implements RMApp, Recoverable { public void transition(RMAppImpl app, RMAppEvent event) { app.handler.handle(new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); + // send the ATS create Event + app.sendATSCreateEvent(); } } @@ -1212,9 +1215,6 @@ public class RMAppImpl implements RMApp, Recoverable { // communication LOG.info("Storing application with id " + app.applicationId); app.rmContext.getStateStore().storeNewApplication(app); - - // send the ATS create Event - app.sendATSCreateEvent(app, app.startTime); } } @@ -1912,9 +1912,9 @@ public class RMAppImpl implements RMApp, Recoverable { return callerContext; } - private void sendATSCreateEvent(RMApp app, long startTime) { - rmContext.getRMApplicationHistoryWriter().applicationStarted(app); - rmContext.getSystemMetricsPublisher().appCreated(app, startTime); + private void sendATSCreateEvent() { + rmContext.getRMApplicationHistoryWriter().applicationStarted(this); + rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 9d2f89d7981..6bf5ecf1d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -373,25 +374,32 @@ public class TestRMAppTransitions { protected RMApp testCreateAppNewSaving( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = createNewTestApp(submissionContext); + RMApp application = createNewTestApp(submissionContext); // NEW => NEW_SAVING event RMAppEventType.START RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.START); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.NEW_SAVING, application); + // verify sendATSCreateEvent() is not get called during + // RMAppNewlySavingTransition. + verify(publisher, times(0)).appCreated(eq(application), anyLong()); return application; } protected RMApp testCreateAppSubmittedNoRecovery( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppNewSaving(submissionContext); - // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED + RMApp application = testCreateAppNewSaving(submissionContext); + // NEW_SAVING => SUBMITTED event RMAppEventType.APP_NEW_SAVED RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_NEW_SAVED); + new RMAppEvent(application.getApplicationId(), + RMAppEventType.APP_NEW_SAVED); application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); + // verify sendATSCreateEvent() is get called during + // AddApplicationToSchedulerTransition. + verify(publisher).appCreated(eq(application), anyLong()); return application; } @@ -406,7 +414,6 @@ public class TestRMAppTransitions { RMAppEvent event = new RMAppRecoverEvent(application.getApplicationId(), state); - application.handle(event); assertStartTimeSet(application); assertAppState(RMAppState.SUBMITTED, application); @@ -416,7 +423,7 @@ public class TestRMAppTransitions { protected RMApp testCreateAppAccepted( ApplicationSubmissionContext submissionContext) throws IOException { RMApp application = testCreateAppSubmittedNoRecovery(submissionContext); - // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED + // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.APP_ACCEPTED); @@ -428,7 +435,7 @@ public class TestRMAppTransitions { protected RMApp testCreateAppRunning( ApplicationSubmissionContext submissionContext) throws IOException { - RMApp application = testCreateAppAccepted(submissionContext); + RMApp application = testCreateAppAccepted(submissionContext); // ACCEPTED => RUNNING event RMAppEventType.ATTEMPT_REGISTERED RMAppEvent event = new RMAppEvent(application.getApplicationId(),