From 63e374060e74bc30cb0867ae7a99447c62a1a9f8 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 17 Jul 2014 00:15:28 +0000 Subject: [PATCH] YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions after RM recovery but before scheduler learns about apps and app-attempts. Contributed by Jian He. svn merge --ignore-ancestry -c 1611222 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611223 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 +++ .../resourcemanager/rmapp/RMAppImpl.java | 26 ++++++++-------- .../rmapp/attempt/RMAppAttemptImpl.java | 6 ++-- .../scheduler/capacity/CapacityScheduler.java | 30 ++++++++++++------- .../event/AppAddedSchedulerEvent.java | 10 +++++++ .../event/AppAttemptAddedSchedulerEvent.java | 12 ++++---- .../scheduler/fair/FairScheduler.java | 30 ++++++++++++------- .../scheduler/fifo/FifoScheduler.java | 30 ++++++++++++------- .../resourcemanager/TestFifoScheduler.java | 2 +- .../TestWorkPreservingRMRestart.java | 30 +++++++++++++++++++ .../scheduler/fair/FairSchedulerTestBase.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 12 ++++---- 12 files changed, 131 insertions(+), 63 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9f4f3bfd582..12ae8f36240 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -44,6 +44,10 @@ Release 2.6.0 - UNRELEASED YARN-2264. Fixed a race condition in DrainDispatcher which may cause random test failures. (Li Lu via jianhe) + YARN-2219. Changed ResourceManager to avoid AMs and NMs getting exceptions + after RM recovery but before scheduler learns about apps and app-attempts. + (Jian He via vinodkv) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 523e6beb512..efa1ee72808 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp, Recoverable { return app.recoveredFinalState; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved. + if (app.attempts.isEmpty()) { + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user)); + return RMAppState.SUBMITTED; + } + + // Add application to scheduler synchronously to guarantee scheduler + // knows applications before AM or NM re-registers. + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, true)); // recover attempts app.recoverAppAttempts(); @@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp, Recoverable { return RMAppState.ACCEPTED; } - // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved. - if (app.attempts.isEmpty()) { - return RMAppState.SUBMITTED; - } - // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 000227a8183..dbcf64fc391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -926,8 +926,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); - appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, false)); + // Add attempt to scheduler synchronously to guarantee scheduler + // knows attempts before AM or NM re-registers. + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 649eb92019e..6d26519ff98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -521,7 +521,7 @@ public class CapacityScheduler extends } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -553,14 +553,20 @@ public class CapacityScheduler extends applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -578,14 +584,15 @@ public class CapacityScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -905,7 +912,8 @@ public class CapacityScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -921,7 +929,7 @@ public class CapacityScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index d6fb36df78b..7e0b89e20f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent extends SchedulerEvent { private final ApplicationId applicationId; private final String queue; private final String user; + private final boolean isAppRecovering; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, false); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, boolean isAppRecovering) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.isAppRecovering = isAppRecovering; } public ApplicationId getApplicationId() { @@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent extends SchedulerEvent { return user; } + public boolean getIsAppRecovering() { + return isAppRecovering; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index 64d308adea1..6e66d2a6601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent { private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; - private final boolean shouldNotifyAttemptAdded; + private final boolean isAttemptRecovering; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - this(applicationAttemptId, transferStateFromPreviousAttempt, true); + this(applicationAttemptId, transferStateFromPreviousAttempt, false); } public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; - this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; + this.isAttemptRecovering = isAttemptRecovering; } public ApplicationAttemptId getApplicationAttemptId() { @@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEvent extends SchedulerEvent { return transferStateFromPreviousAttempt; } - public boolean getShouldNotifyAttemptAdded() { - return shouldNotifyAttemptAdded; + public boolean getIsAttemptRecovering() { + return isAttemptRecovering; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7e867554f9b..3a847ce7589 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -566,7 +566,7 @@ public class FairScheduler extends * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -603,8 +603,14 @@ public class FairScheduler extends LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } /** @@ -613,7 +619,7 @@ public class FairScheduler extends protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -642,14 +648,15 @@ public class FairScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -1136,7 +1143,8 @@ public class FairScheduler extends } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1154,7 +1162,7 @@ public class FairScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b017db7321e..571d0558c04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -356,22 +356,28 @@ public class FifoScheduler extends @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { + String queue, String user, boolean isAppRecovering) { SchedulerApplication application = new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -389,14 +395,15 @@ public class FifoScheduler extends metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(appAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -772,7 +779,8 @@ public class FifoScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -788,7 +796,7 @@ public class FifoScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index aa7f63144eb..b8b41333e66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -228,7 +228,7 @@ public class TestFifoScheduler { scheduler.handle(new NodeAddedSchedulerEvent(node)); ApplicationId appId = ApplicationId.newInstance(0, 1); - scheduler.addApplication(appId, "queue1", "user1"); + scheduler.addApplication(appId, "queue1", "user1", true); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 59b11ef8228..24a2f437953 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart { attempt0.getMasterContainer().getId()).isAMContainer()); } + @Test (timeout = 20000) + public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception { + // start RM + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // scheduler app/attempt is immediately available after RM is re-started. + Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo( + am0.getApplicationAttemptId())); + + // getTransferredContainers should not throw NPE. + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getTransferredContainers(am0.getApplicationAttemptId()); + + List containers = createNMContainerStatusForApp(am0); + nm1.registerNode(containers, null); + waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId()); + } private void asserteMetrics(QueueMetrics qm, int appsSubmitted, int appsPending, int appsRunning, int appsCompleted, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 50f61d8d592..7d666dadb60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -147,7 +147,7 @@ public class FairSchedulerTestBase { int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, true); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 9e2d836014f..eda87cd7875 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -793,13 +793,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", true); scheduler.addApplicationAttempt(id11, false, true); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id21, false, true); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", true); scheduler.addApplicationAttempt(id22, false, true); int minReqSize = @@ -1561,7 +1561,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", true); scheduler.addApplicationAttempt(appId, false, true); // 1 request with 2 nodes on the same rack. another request with 1 node on @@ -1843,7 +1843,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, true); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -2720,7 +2720,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", true); fs.addApplicationAttempt(appAttemptId, false, true); List ask = new ArrayList(); ResourceRequest request =