From 581072a8f04f7568d3560f105fd1988d3acc9e54 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Mon, 20 Jan 2020 13:10:32 +0100 Subject: [PATCH] YARN-7913. Improve error handling when application recovery fails with exception. Contributed by Wilfred Spiegelenburg --- .../scheduler/fair/FairScheduler.java | 50 ++++--- .../scheduler/fair/FairSchedulerTestBase.java | 24 ++++ .../scheduler/fair/TestFairScheduler.java | 124 ++++++++++++++++++ 3 files changed, 181 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c493847ebce..3c9dcb155ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -480,27 +480,39 @@ public class FairScheduler extends writeLock.lock(); try { // Assign the app to the queue creating and prevent queue delete. + // This will re-create the queue on restore, however this could fail if + // the config was changed. FSLeafQueue queue = queueMgr.getLeafQueue(queueName, true, applicationId); if (queue == null) { - rejectApplicationWithMessage(applicationId, - queueName + " is not a leaf queue"); - return; + if (!isAppRecovering) { + rejectApplicationWithMessage(applicationId, + queueName + " is not a leaf queue"); + return; + } + // app is recovering we do not want to fail the app now as it was there + // before we started the recovery. Add it to the recovery queue: + // dynamic queue directly under root, no ACL needed (auto clean up) + queueName = "root.recovery"; + queue = queueMgr.getLeafQueue(queueName, true, applicationId); } - // Enforce ACLs: 2nd check, there could be a time laps between the app - // creation in the RMAppManager and getting here. That means we could - // have a configuration change (prevent race condition) - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( - user); - - if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && - !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { - String msg = "User " + user + " does not have permission to submit " + - applicationId + " to queue " + queueName; - rejectApplicationWithMessage(applicationId, msg); - queue.removeAssignedApp(applicationId); - return; + // Skip ACL check for recovering applications: they have been accepted + // in the queue already recovery should not break that. + if (!isAppRecovering) { + // Enforce ACLs: 2nd check, there could be a time laps between the app + // creation in the RMAppManager and getting here. That means we could + // have a configuration change (prevent race condition) + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( + user); + if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && + !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { + String msg = "User " + user + " does not have permission to submit " + + applicationId + " to queue " + queueName; + rejectApplicationWithMessage(applicationId, msg); + queue.removeAssignedApp(applicationId); + return; + } } RMApp rmApp = rmContext.getRMApps().get(applicationId); @@ -511,7 +523,11 @@ public class FairScheduler extends " to set queue name on"); } - if (rmApp != null && rmApp.getAMResourceRequests() != null) { + // when recovering the NMs might not have registered and we could have + // no resources in the queue, the app is already running and has thus + // passed all these checks, skip them now. + if (!isAppRecovering && rmApp != null && + rmApp.getAMResourceRequests() != null) { // Resources.fitsIn would always return false when queueMaxShare is 0 // for any resource, but only using Resources.fitsIn is not enough // is it would return false for such cases when the requested diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index b9316181728..818fcc9e386 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -270,6 +270,30 @@ public class FairSchedulerTestBase { scheduler.update(); } + protected ApplicationAttemptId createRecoveringApplication( + Resource amResource, String queueId, String userId) { + ApplicationAttemptId id = + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + + // On restore the app is already created but we need to check the AM + // resource, make sure it is set for test + ResourceRequest amRequest = createResourceRequest( + // cast to int as we're not testing large values so it is safe + (int)amResource.getMemorySize(), amResource.getVirtualCores(), + ResourceRequest.ANY, 1, 1, true); + List amReqs = new ArrayList<>(); + amReqs.add(amRequest); + createApplicationWithAMResourceInternal(id, queueId, userId, amResource, + amReqs); + + // This fakes the placement which is not part of the scheduler anymore + ApplicationPlacementContext placementCtx = + new ApplicationPlacementContext(queueId); + scheduler.addApplication(id.getApplicationId(), queueId, userId, true, + placementCtx); + return id; + } + protected void createApplicationWithAMResource(ApplicationAttemptId attId, String queue, String user, Resource amResource) { createApplicationWithAMResourceInternal(attId, queue, user, amResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index e6fe83a4718..2e043fb0481 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5525,4 +5525,128 @@ public class TestFairScheduler extends FairSchedulerTestBase { createSchedulingRequest(memory, vCores, "queueA", "user1", 1, 2); } + + @Test + public void testRestoreToExistingQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFilePercentageResource(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // no nodes so the resource total should be zero for all queues + // AM is using resources + Resource amResource = Resources.createResource(1024, 1); + // Add the app and the attempt + ApplicationAttemptId appAttemptId = null; + String queueId = "root.parent.queueA"; + try { + appAttemptId = createRecoveringApplication(amResource, queueId, "user1"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + List appsInQueue = + scheduler.getAppsInQueue(queueId); + assertEquals("Number of apps in queue 'root.parent.queueA' should be one!", + 1, appsInQueue.size()); + + appAttemptId = scheduler.getAppsInQueue(queueId).get(0); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + + // Restore an applications with a user that has no access to the queue + try { + appAttemptId = createRecoveringApplication(amResource, queueId, + "usernotinacl"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + appsInQueue = scheduler.getAppsInQueue(queueId); + assertEquals("Number of apps in queue 'root.parent.queueA' should be two!", + 2, appsInQueue.size()); + + appAttemptId = scheduler.getAppsInQueue(queueId).get(1); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + } + + @Test + public void testRestoreToParentQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + generateAllocationFilePercentageResource(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // no nodes so the resource total should be zero for all queues + // AM is using resources + Resource amResource = Resources.createResource(1024, 1); + // Add the app and the attempt, use a queue that is now a parent + ApplicationAttemptId appAttemptId = null; + String queueId = "root.parent"; + try { + appAttemptId = createRecoveringApplication(amResource, queueId, "user1"); + } catch (Exception e) { + fail("The exception is not expected. Exception message: " + + e.getMessage()); + } + scheduler.addApplicationAttempt(appAttemptId, false, true); + + String recoveredQueue = "root.recovery"; + List appsInQueue = + scheduler.getAppsInQueue(recoveredQueue); + assertEquals("Number of apps in queue 'root.recovery' should be one!", + 1, appsInQueue.size()); + + appAttemptId = + scheduler.getAppsInQueue(recoveredQueue).get(0); + assertNotNull("Scheduler app for appAttemptId " + appAttemptId + + " should not be null!", scheduler.getSchedulerApp(appAttemptId)); + + FSAppAttempt schedulerApp = scheduler.getSchedulerApp(appAttemptId); + assertNotNull("Scheduler app queueInfo for appAttemptId " + appAttemptId + + " should not be null!", schedulerApp.getAppSchedulingInfo()); + + assertTrue("There should be no requests accepted", schedulerApp + .getAppSchedulingInfo().getAllResourceRequests().isEmpty()); + } + + private void generateAllocationFilePercentageResource() { + AllocationFileWriter.create() + .addQueue(new AllocationFileQueue.Builder("root") + .parent(true) + .aclSubmitApps(" ") + .aclAdministerApps(" ") + .subQueue(new AllocationFileQueue.Builder("parent") + .parent(true) + .maxChildResources("memory-mb=15.0%, vcores=15.0%") + .subQueue(new AllocationFileQueue.Builder("queueA") + .aclSubmitApps("user1") + .build()) + .build()) + .build()) + .writeToFile(ALLOC_FILE); + } }