diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 700d9700ca2..a62e05ca752 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -100,6 +100,9 @@ Release 2.6.0 - UNRELEASED YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken periodically. (xgong) + YARN-2026. Fair scheduler: Consider only active queues for computing fairshare. + (Ashwin Shankar via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 4f8ac1e6374..5134be4dce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -116,6 +116,18 @@ public abstract class Schedulable { return fairShare; } + /** + * Returns true if queue has atleast one app running. Always returns true for + * AppSchedulables. + */ + public boolean isActive() { + if (this instanceof FSQueue) { + FSQueue queue = (FSQueue) this; + return queue.getNumRunnableApps() > 0; + } + return true; + } + /** Convenient toString implementation for debugging. */ @Override public String toString() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 77dad493265..6363ec0218c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; +import java.util.ArrayList; import java.util.Collection; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; public class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; - + + /** + * Compute fair share of the given schedulables.Fair share is an allocation of + * shares considering only active schedulables ie schedulables which have + * running apps. + * + * @param schedulables + * @param totalResources + * @param type + */ + public static void computeShares( + Collection schedulables, Resource totalResources, + ResourceType type) { + Collection activeSchedulables = new ArrayList(); + for (Schedulable sched : schedulables) { + if (sched.isActive()) { + activeSchedulables.add(sched); + } else { + setResourceValue(0, sched.getFairShare(), type); + } + } + + computeSharesInternal(activeSchedulables, totalResources, type); + } + /** * Given a set of Schedulables and a number of slots, compute their weighted * fair shares. The min and max shares and of the Schedulables are assumed to @@ -75,7 +100,7 @@ public class ComputeFairShares { * because resourceUsedWithWeightToResourceRatio is linear-time and the number of * iterations of binary search is a constant (dependent on desired precision). */ - public static void computeShares( + private static void computeSharesInternal( Collection schedulables, Resource totalResources, ResourceType type) { if (schedulables.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index f3289ccea1c..0ada021aa20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -292,6 +292,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "queue1", "user1"); createSchedulingRequest(10 * 1024, "queue2", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -322,6 +323,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "parent.queue2", "user1"); createSchedulingRequest(10 * 1024, "parent.queue3", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -766,8 +768,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(nodeEvent1); // user1,user2 submit their apps to parentq and create user queues - scheduler.assignToQueue(rmApp1, "root.parentq", "user1"); - scheduler.assignToQueue(rmApp2, "root.parentq", "user2"); + createSchedulingRequest(10 * 1024, "root.parentq", "user1"); + createSchedulingRequest(10 * 1024, "root.parentq", "user2"); + // user3 submits app in default queue + createSchedulingRequest(10 * 1024, "root.default", "user3"); scheduler.update(); @@ -1287,7 +1291,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() .getLeafQueue("queueA.queueA2", false), clock.getTime()); - assertEquals(2980, toPreempt.getMemory()); + assertEquals(3277, toPreempt.getMemory()); // verify if the 3 containers required by queueA2 are preempted in the same // round @@ -2446,8 +2450,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); - assertEquals("Queue queue1's fair share should be 10240", - 10240, queue1.getFairShare().getMemory()); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); + + createSchedulingRequest(1 * 1024, "root.default", "user1"); + scheduler.update(); + scheduler.handle(updateEvent); Resource amResource1 = Resource.newInstance(1024, 1); Resource amResource2 = Resource.newInstance(2048, 2); @@ -2635,24 +2643,32 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); - assertEquals("Queue queue1's fair share should be 1366", - 1366, queue1.getFairShare().getMemory()); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); FSLeafQueue queue2 = scheduler.getQueueManager().getLeafQueue("queue2", true); - assertEquals("Queue queue2's fair share should be 1366", - 1366, queue2.getFairShare().getMemory()); + assertEquals("Queue queue2's fair share should be 0", 0, queue2 + .getFairShare().getMemory()); FSLeafQueue queue3 = scheduler.getQueueManager().getLeafQueue("queue3", true); - assertEquals("Queue queue3's fair share should be 1366", - 1366, queue3.getFairShare().getMemory()); + assertEquals("Queue queue3's fair share should be 0", 0, queue3 + .getFairShare().getMemory()); FSLeafQueue queue4 = scheduler.getQueueManager().getLeafQueue("queue4", true); - assertEquals("Queue queue4's fair share should be 1366", - 1366, queue4.getFairShare().getMemory()); + assertEquals("Queue queue4's fair share should be 0", 0, queue4 + .getFairShare().getMemory()); FSLeafQueue queue5 = scheduler.getQueueManager().getLeafQueue("queue5", true); - assertEquals("Queue queue5's fair share should be 1366", - 1366, queue5.getFairShare().getMemory()); + assertEquals("Queue queue5's fair share should be 0", 0, queue5 + .getFairShare().getMemory()); + + List queues = Arrays.asList("root.default", "root.queue3", + "root.queue4", "root.queue5"); + for (String queue : queues) { + createSchedulingRequest(1 * 1024, queue, "user1"); + scheduler.update(); + scheduler.handle(updateEvent); + } Resource amResource1 = Resource.newInstance(2048, 1); int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java new file mode 100644 index 00000000000..8b8ce93b506 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFairSchedulerFairShare extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairSchedulerFairShare.class.getName() + ".xml").getAbsolutePath(); + + @Before + public void setup() throws IOException { + conf = createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; + } + + private void createClusterWithQueuesAndOneNode(int mem, String policy) + throws IOException { + createClusterWithQueuesAndOneNode(mem, 0, policy); + } + + private void createClusterWithQueuesAndOneNode(int mem, int vCores, + String policy) throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 8"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println("" + policy + + ""); + out.println(""); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(mem, vCores), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + } + + @Test + public void testFairShareNoAppsRunning() throws IOException { + int nodeCapacity = 16 * 1024; + createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + scheduler.update(); + // No apps are running in the cluster,verify if fair share is zero + // for all queues under parentA and parentB. + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().startsWith("root.parentA")) { + assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity + * 100, 0); + } else if (leaf.getName().startsWith("root.parentB")) { + assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity + * 100, 0.1); + } + } + } + + @Test + public void testFairShareOneAppRunning() throws IOException { + int nodeCapacity = 16 * 1024; + createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run a app in a childA1. Verify whether fair share is 100% in childA1, + // since it is the only active queue. + // Also verify if fair share is 0 for childA2. since no app is + // running in it. + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + + scheduler.update(); + + assertEquals( + 100, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA1", false).getFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + assertEquals( + 0, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA2", false).getFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + } + + @Test + public void testFairShareMultipleActiveQueuesUnderSameParent() + throws IOException { + int nodeCapacity = 16 * 1024; + createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2,childA3 + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(2 * 1024, "root.parentA.childA2", "user2"); + createSchedulingRequest(2 * 1024, "root.parentA.childA3", "user3"); + + scheduler.update(); + + // Verify if fair share is 100 / 3 = 33% + for (int i = 1; i <= 3; i++) { + assertEquals( + 33, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getMemory() + / nodeCapacity * 100, .9); + } + } + + @Test + public void testFairShareMultipleActiveQueuesUnderDifferentParent() + throws IOException { + int nodeCapacity = 16 * 1024; + createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2 which are under parentA + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2"); + + // Run app in childB1 which is under parentB + createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3"); + + // Run app in root.default queue + createSchedulingRequest(1 * 1024, "root.default", "user4"); + + scheduler.update(); + + // The two active child queues under parentA would + // get fair share of 80/2=40% + for (int i = 1; i <= 2; i++) { + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getMemory() + / nodeCapacity * 100, .9); + } + + // The child queue under parentB would get a fair share of 10%, + // basically all of parentB's fair share + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getFairShare() + .getMemory() + / nodeCapacity * 100, .9); + } + + @Test + public void testFairShareResetsToZeroWhenAppsComplete() throws IOException { + int nodeCapacity = 16 * 1024; + createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); + + // Run apps in childA1,childA2 which are under parentA + ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024, + "root.parentA.childA1", "user1"); + ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024, + "root.parentA.childA2", "user2"); + + scheduler.update(); + + // Verify if both the active queues under parentA get 50% fair + // share + for (int i = 1; i <= 2; i++) { + assertEquals( + 50, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getMemory() + / nodeCapacity * 100, .9); + } + // Let app under childA1 complete. This should cause the fair share + // of queue childA1 to be reset to zero,since the queue has no apps running. + // Queue childA2's fair share would increase to 100% since its the only + // active queue. + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + app1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + + assertEquals( + 0, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA1", false).getFairShare() + .getMemory() + / nodeCapacity * 100, 0); + assertEquals( + 100, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA2", false).getFairShare() + .getMemory() + / nodeCapacity * 100, 0.1); + } + + @Test + public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() + throws IOException { + int nodeMem = 16 * 1024; + int nodeVCores = 10; + createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf"); + + // Run apps in childA1,childA2 which are under parentA + createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); + createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2"); + + // Run app in childB1 which is under parentB + createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3"); + + // Run app in root.default queue + createSchedulingRequest(1 * 1024, "root.default", "user4"); + + scheduler.update(); + + // The two active child queues under parentA would + // get 80/2=40% memory and vcores + for (int i = 1; i <= 2; i++) { + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getMemory() + / nodeMem * 100, .9); + assertEquals( + 40, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentA.childA" + i, false).getFairShare() + .getVirtualCores() + / nodeVCores * 100, .9); + } + + // The only active child queue under parentB would get 10% memory and vcores + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getFairShare() + .getMemory() + / nodeMem * 100, .9); + assertEquals( + 10, + (double) scheduler.getQueueManager() + .getLeafQueue("root.parentB.childB1", false).getFairShare() + .getVirtualCores() + / nodeVCores * 100, .9); + } +}