diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a60e4e0dcda..94b8848bbb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -477,12 +477,25 @@ public class FairScheduler extends writeLock.lock(); try { RMApp rmApp = rmContext.getRMApps().get(applicationId); + // This will re-create the queue on restore, however this could fail if + // the config was changed. FSLeafQueue queue = assignToQueue(rmApp, queueName, user); if (queue == null) { - return; + if (!isAppRecovering) { + return; + } + // app is recovering we do not want to fail the app now as it was there + // before we started the recovery. Add it to the recovery queue: + // dynamic queue directly under root, no ACL needed (auto clean up) + queueName = "root.recovery"; + queue = queueMgr.getLeafQueue(queueName, true); } - if (rmApp != null && rmApp.getAMResourceRequests() != null) { + // when recovering the NMs might not have registered and we could have + // no resources in the queue, the app is already running and has thus + // passed all these checks, skip them now. + if (!isAppRecovering && rmApp != null && + rmApp.getAMResourceRequests() != null) { // Resources.fitsIn would always return false when queueMaxShare is 0 // for any resource, but only using Resources.fitsIn is not enough // is it would return false for such cases when the requested @@ -507,17 +520,21 @@ public class FairScheduler extends } } - // Enforce ACLs - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( - user); + // Skip ACL check for recovering applications: they have been accepted + // in the queue already recovery should not break that. + if (!isAppRecovering) { + // Enforce ACLs + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( + user); - if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue - .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { - String msg = "User " + userUgi.getUserName() - + " cannot submit applications to queue " + queue.getName() - + "(requested queuename is " + queueName + ")"; - rejectApplicationWithMessage(applicationId, msg); - return; + if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue + .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { + String msg = "User " + userUgi.getUserName() + + " cannot submit applications to queue " + queue.getName() + + "(requested queuename is " + queueName + ")"; + rejectApplicationWithMessage(applicationId, msg); + return; + } } SchedulerApplication application = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 4f1f20b942b..493386cc2ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -260,6 +260,26 @@ public class FairSchedulerTestBase { scheduler.update(); } + protected ApplicationAttemptId createRecoveringApplication( + Resource amResource, String queueId, String userId) { + ApplicationAttemptId id = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + + // On restore the app is already created but we need to check the AM + // resource, make sure it is set for test + ResourceRequest amRequest = createResourceRequest( + // cast to int as we're not testing large values so it is safe + (int)amResource.getMemorySize(), amResource.getVirtualCores(), + ResourceRequest.ANY, 1, 1, true); + List amReqs = new ArrayList<>(); + amReqs.add(amRequest); + createApplicationWithAMResourceInternal(id, queueId, userId, amResource, + amReqs); + + scheduler.addApplication(id.getApplicationId(), queueId, userId, true); + return id; + } + protected void createApplicationWithAMResource(ApplicationAttemptId attId, String queue, String user, Resource amResource) { createApplicationWithAMResourceInternal(attId, queue, user, amResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 5166c368f4c..9cb4ffd3505 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5766,4 +5766,131 @@ public class TestFairScheduler extends FairSchedulerTestBase { createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2); } + + @Test + public void testRestoreToExistingQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFilePercentageResource(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // no nodes so the resource total should be zero for all queues + // AM is using resources + Resource amResource = Resources.createResource(1024, 1); + // Add the app and the attempt + ApplicationAttemptId appAttemptId = null; + String queueId = "root.parent.queueA"; + try { + appAttemptId = createRecoveringApplication(amResource, queueId, "user1"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + List appsInQueue = + scheduler.getAppsInQueue(queueId); + assertEquals("Number of apps in queue 'root.parent.queueA' should be one!", + 1, appsInQueue.size()); + + appAttemptId = scheduler.getAppsInQueue(queueId).get(0); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + + // Restore an applications with a user that has no access to the queue + try { + appAttemptId = createRecoveringApplication(amResource, queueId, + "usernotinacl"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + appsInQueue = scheduler.getAppsInQueue(queueId); + assertEquals("Number of apps in queue 'root.parent.queueA' should be two!", + 2, appsInQueue.size()); + + appAttemptId = scheduler.getAppsInQueue(queueId).get(1); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + } + + @Test + public void testRestoreToParentQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFilePercentageResource(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // no nodes so the resource total should be zero for all queues + // AM is using resources + Resource amResource = Resources.createResource(1024, 1); + // Add the app and the attempt, use a queue that is now a parent + ApplicationAttemptId appAttemptId = null; + String queueId = "root.parent"; + try { + appAttemptId = createRecoveringApplication(amResource, queueId, "user1"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + String recoveredQueue = "root.recovery"; + List appsInQueue = + scheduler.getAppsInQueue(recoveredQueue); + assertEquals("Number of apps in queue 'root.recovery' should be one!", + 1, appsInQueue.size()); + + appAttemptId = + scheduler.getAppsInQueue(recoveredQueue).get(0); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + } + + private void generateAllocationFilePercentageResource() + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" "); + out.println(""); + out.println("memory-mb=15.0%, vcores=15.0%" + + ""); + out.println(""); + out.println("user1"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + } }