From ee5dd2c388d2e0d328c35953cbde460d3a737f30 Mon Sep 17 00:00:00 2001 From: Sunil Date: Sat, 5 Nov 2016 13:06:31 +0530 Subject: [PATCH] YARN-5802. updateApplicationPriority api in scheduler should ensure to re-insert app to correct ordering policy. Contributed by Bibin A Chundatt --- .../scheduler/capacity/CapacityScheduler.java | 17 +++-- .../resourcemanager/TestClientRMService.java | 26 +++++++ .../capacity/TestApplicationPriority.java | 74 +++++++++++++++++++ 3 files changed, 111 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7601e2ed786..93b8733a924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2067,14 +2067,19 @@ public class CapacityScheduler extends // priority then reinsert back to make order correct. LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); synchronized (queue) { - queue.getOrderingPolicy().removeSchedulableEntity( - application.getCurrentAppAttempt()); - + FiCaSchedulerApp attempt = application.getCurrentAppAttempt(); + boolean isActive = + queue.getOrderingPolicy().removeSchedulableEntity(attempt); + if (!isActive) { + queue.getPendingAppsOrderingPolicy().removeSchedulableEntity(attempt); + } // Update new priority in SchedulerApplication application.setPriority(appPriority); - - queue.getOrderingPolicy().addSchedulableEntity( - application.getCurrentAppAttempt()); + if (isActive) { + queue.getOrderingPolicy().addSchedulableEntity(attempt); + } else { + queue.getPendingAppsOrderingPolicy().addSchedulableEntity(attempt); + } } // Update the changed application state to timeline server diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 21656d47e86..b7f0a8a8108 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1617,6 +1617,32 @@ public class TestClientRMService { rm.close(); } + @Test(timeout = 120000) + public void testUpdatePriorityAndKillAppWithZeroClusterResource() + throws Exception { + int maxPriority = 10; + int appPriority = 5; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + maxPriority); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); + ClientRMService rmService = rm.getClientRMService(); + // Update application priority + UpdateApplicationPriorityRequest updateRequest = + UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(), + Priority.newInstance(appPriority)); + rmService.updateApplicationPriority(updateRequest); + Assert.assertEquals( + "Application priority should be updated to " + appPriority, appPriority, + app1.getApplicationSubmissionContext().getPriority().getPriority()); + rm.killApp(app1.getApplicationId()); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm.stop(); + } + @Test(timeout = 120000) public void testUpdateApplicationPriorityRequest() throws Exception { int maxPriority = 10; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index d7e12e645ee..8dcfa383954 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeRepo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -733,4 +736,75 @@ public class TestApplicationPriority { rm2.stop(); rm1.stop(); } + + @Test(timeout = 120000) + public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception { + int maxPriority = 10; + int appPriority = 5; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + maxPriority); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue defaultQueue = (LeafQueue) cs.getQueue("default"); + + // Update priority and kill application with no resource + RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Collection appsPending = + ((LeafQueue) defaultQueue).getPendingApplications(); + Collection activeApps = + ((LeafQueue) defaultQueue).getOrderingPolicy().getSchedulableEntities(); + + // Verify app is in pending state + Assert.assertEquals("Pending apps should be 1", 1, appsPending.size()); + Assert.assertEquals("Active apps should be 0", 0, activeApps.size()); + + // kill app1 which is pending + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1); + + // Check ordering policy size when resource is added + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService()); + nm1.registerNode(); + RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Assert.assertEquals("Pending apps should be 0", 0, appsPending.size()); + Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); + RMApp app3 = rm.submitApp(1024, Priority.newInstance(appPriority)); + RMApp app4 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Assert.assertEquals("Pending apps should be 2", 2, appsPending.size()); + Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); + // kill app3, pending apps should reduce to 1 + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 1, 1, app3); + // kill app2, running apps is killed and pending added to running + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 1, app2); + // kill app4, all apps are killed and both policy size should be zero + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app4); + rm.stop(); + } + + private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, + int appsPendingExpected, int activeAppsExpected, RMApp app) + throws YarnException { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + cs.updateApplicationPriority(Priority.newInstance(2), + app.getApplicationId()); + SchedulerEvent removeAttempt; + removeAttempt = new AppAttemptRemovedSchedulerEvent( + app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, + false); + cs.handle(removeAttempt); + rm.drainEvents(); + Collection appsPending = + ((LeafQueue) defaultQueue).getPendingApplications(); + Collection activeApps = + ((LeafQueue) defaultQueue).getApplications(); + Assert.assertEquals("Pending apps should be " + appsPendingExpected, + appsPendingExpected, appsPending.size()); + Assert.assertEquals("Active apps should be " + activeAppsExpected, + activeAppsExpected, activeApps.size()); + } + }