From fa1d84ae2739a1e76f58b9c96d1378f9453cc0d2 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 10 Aug 2015 20:51:54 -0700 Subject: [PATCH] YARN-3887. Support changing Application priority during runtime. Contributed by Sunil G --- hadoop-yarn-project/CHANGES.txt | 3 + .../recovery/RMStateStore.java | 5 + .../scheduler/AbstractYarnScheduler.java | 7 + .../SchedulerApplicationAttempt.java | 2 +- .../scheduler/YarnScheduler.java | 11 + .../scheduler/capacity/CapacityScheduler.java | 49 ++++ .../AbstractComparatorOrderingPolicy.java | 6 + .../capacity/TestApplicationPriority.java | 260 ++++++++++++++++++ 8 files changed, 342 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5e27a2f281c..ada105604fa 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.8.0 - UNRELEASED YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. (Sunil G via wangda) + YARN-3887. Support changing Application priority during runtime. (Sunil G + via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 50364501236..affbee12490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -706,6 +706,11 @@ public abstract class RMStateStore extends AbstractService { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } + public void updateApplicationStateSynchronously( + ApplicationStateData appState) { + handleStoreEvent(new RMStateUpdateAppEvent(appState)); + } + public void updateFencedState() { handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d69600ab62e..ed051899baf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -701,4 +701,11 @@ public abstract class AbstractYarnScheduler // specific scheduler. return Priority.newInstance(0); } + + @Override + public void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException { + // Dummy Implementation till Application Priority changes are done in + // specific scheduler. + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 317e61c279d..48725435c09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -98,7 +98,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private boolean amRunning = false; private LogAggregationContext logAggregationContext; - private Priority appPriority = null; + private volatile Priority appPriority = null; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index f6295794c72..0fa23e173d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -306,4 +306,15 @@ public interface YarnScheduler extends EventHandler { public Priority checkAndGetApplicationPriority(Priority priorityFromContext, String user, String queueName, ApplicationId applicationId) throws YarnException; + + /** + * + * Change application priority of a submitted application at runtime + * + * @param newPriority Submitted Application priority. + * + * @param applicationId Application ID + */ + public void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b4d00954019..b4b13838cf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -1850,4 +1851,52 @@ public class CapacityScheduler extends public Priority getMaxClusterLevelAppPriority() { return maxClusterLevelAppPriority; } + + @Override + public synchronized void updateApplicationPriority(Priority newPriority, + ApplicationId applicationId) throws YarnException { + Priority appPriority = null; + SchedulerApplication application = applications + .get(applicationId); + + if (application == null) { + throw new YarnException("Application '" + applicationId + + "' is not present, hence could not change priority."); + } + + if (application.getPriority().equals(newPriority)) { + return; + } + + RMApp rmApp = rmContext.getRMApps().get(applicationId); + appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), + rmApp.getQueue(), applicationId); + + // Update new priority in Submission Context to keep track in HA + rmApp.getApplicationSubmissionContext().setPriority(appPriority); + + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + rmApp.getSubmitTime(), rmApp.getStartTime(), + rmApp.getApplicationSubmissionContext(), rmApp.getUser()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState); + + // As we use iterator over a TreeSet for OrderingPolicy, once we change + // priority then reinsert back to make order correct. + LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); + synchronized (queue) { + queue.getOrderingPolicy().removeSchedulableEntity( + application.getCurrentAppAttempt()); + + // Update new priority in SchedulerApplication + application.setPriority(appPriority); + + queue.getOrderingPolicy().addSchedulableEntity( + application.getCurrentAppAttempt()); + } + + LOG.info("Priority '" + appPriority + "' is updated in queue :" + + rmApp.getQueue() + "for application:" + applicationId + + "for the user: " + rmApp.getUser()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index c4d2aae318b..7bec03a17c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -94,11 +94,17 @@ public abstract class AbstractComparatorOrderingPolicy rmAppState = rmState + .getApplicationState(); + + // PHASE 1: create state in an RM + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + + // let things settle down + Thread.sleep(1000); + + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore); + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(1, rm2.getRMContext().getRMApps().size()); + + // get scheduler app + RMApp loadedApp = rm2.getRMContext().getRMApps() + .get(app1.getApplicationId()); + + // Verify whether priority 15 is reset to 10 + Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt() + .getSubmissionContext().getPriority()); + + rm2.stop(); + rm1.stop(); + } + + @Test + public void testApplicationPriorityAllocationWithChangeInPriority() + throws Exception { + + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + Priority appPriority1 = Priority.newInstance(5); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB); + RMApp app1 = rm.submitApp(1 * GB, appPriority1); + + // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1 + MockAM am1 = MockRM.launchAM(app1, rm, nm1); + am1.registerAppAttempt(); + + // add request for containers and wait for containers to be allocated. + int NUM_CONTAINERS = 7; + List allocated1 = am1.allocateAndWaitForContainers("127.0.0.1", + NUM_CONTAINERS, 2 * GB, nm1); + + Assert.assertEquals(7, allocated1.size()); + Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory()); + + // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available + SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( + nm1.getNodeId()); + Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory()); + + // Submit the second app App2 with priority 8 (Higher than App1) + Priority appPriority2 = Priority.newInstance(8); + RMApp app2 = rm.submitApp(1 * GB, appPriority2); + + // kick the scheduler, 1 GB which was free is given to AM of App2 + nm1.nodeHeartbeat(true); + MockAM am2 = MockRM.launchAM(app2, rm, nm1); + am2.registerAppAttempt(); + + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications() + .get(app1.getApplicationId()).getCurrentAppAttempt(); + // kill 2 containers to free up some space + int counter = 0; + for (Iterator iterator = allocated1.iterator(); iterator + .hasNext();) { + Container c = iterator.next(); + if (++counter > 2) { + break; + } + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + iterator.remove(); + } + + // check node report, 12 GB used and 4 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory()); + + // add request for containers App1 + am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList()); + + // add request for containers App2 and wait for containers to get allocated + List allocated2 = am2.allocateAndWaitForContainers("127.0.0.1", + 2, 2 * GB, nm1); + + Assert.assertEquals(2, allocated2.size()); + // check node report, 16 GB used and 0 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory()); + + // kill 1 more + counter = 0; + for (Iterator iterator = allocated1.iterator(); iterator + .hasNext();) { + Container c = iterator.next(); + if (++counter > 1) { + break; + } + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + iterator.remove(); + } + + // check node report, 14 GB used and 2 GB available + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemory()); + Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory()); + + // Change the priority of App1 to 3 (lowest) + Priority appPriority3 = Priority.newInstance(3); + cs.updateApplicationPriority(appPriority3, app2.getApplicationId()); + + // add request for containers App2 + am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList()); + + // add request for containers App1 and wait for containers to get allocated + // since priority is more for App1 now, App1 will get a container. + List allocated3 = am1.allocateAndWaitForContainers("127.0.0.1", + 1, 2 * GB, nm1); + + Assert.assertEquals(1, allocated3.size()); + // Now App1 will have 5 containers and 1 AM. App2 will have 2 containers. + Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); + rm.stop(); + } }