YARN-2026. Fair scheduler: Consider only active queues for computing fairshare. (Ashwin Shankar via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-08-09 02:10:00 +00:00
parent 74fe84393d
commit a7643f4de7
5 changed files with 381 additions and 17 deletions

View File

@ -100,6 +100,9 @@ Release 2.6.0 - UNRELEASED
YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken
periodically. (xgong)
YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
(Ashwin Shankar via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -116,6 +116,18 @@ public Resource getFairShare() {
return fairShare;
}
/**
* Returns true if queue has atleast one app running. Always returns true for
* AppSchedulables.
*/
public boolean isActive() {
if (this instanceof FSQueue) {
FSQueue queue = (FSQueue) this;
return queue.getNumRunnableApps() > 0;
}
return true;
}
/** Convenient toString implementation for debugging. */
@Override
public String toString() {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
@ -33,7 +34,31 @@
public class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
/**
* Compute fair share of the given schedulables.Fair share is an allocation of
* shares considering only active schedulables ie schedulables which have
* running apps.
*
* @param schedulables
* @param totalResources
* @param type
*/
public static void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) {
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
for (Schedulable sched : schedulables) {
if (sched.isActive()) {
activeSchedulables.add(sched);
} else {
setResourceValue(0, sched.getFairShare(), type);
}
}
computeSharesInternal(activeSchedulables, totalResources, type);
}
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
@ -75,7 +100,7 @@ public class ComputeFairShares {
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeShares(
private static void computeSharesInternal(
Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) {
if (schedulables.isEmpty()) {

View File

@ -292,6 +292,7 @@ public void testSimpleFairShareCalculation() throws IOException {
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "queue2", "user1");
createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update();
@ -322,6 +323,7 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException {
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update();
@ -766,8 +768,10 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
scheduler.handle(nodeEvent1);
// user1,user2 submit their apps to parentq and create user queues
scheduler.assignToQueue(rmApp1, "root.parentq", "user1");
scheduler.assignToQueue(rmApp2, "root.parentq", "user2");
createSchedulingRequest(10 * 1024, "root.parentq", "user1");
createSchedulingRequest(10 * 1024, "root.parentq", "user2");
// user3 submits app in default queue
createSchedulingRequest(10 * 1024, "root.default", "user3");
scheduler.update();
@ -1287,7 +1291,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
scheduler.update();
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
.getLeafQueue("queueA.queueA2", false), clock.getTime());
assertEquals(2980, toPreempt.getMemory());
assertEquals(3277, toPreempt.getMemory());
// verify if the 3 containers required by queueA2 are preempted in the same
// round
@ -2446,8 +2450,12 @@ public void testQueueMaxAMShare() throws Exception {
scheduler.update();
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
assertEquals("Queue queue1's fair share should be 10240",
10240, queue1.getFairShare().getMemory());
assertEquals("Queue queue1's fair share should be 0", 0, queue1
.getFairShare().getMemory());
createSchedulingRequest(1 * 1024, "root.default", "user1");
scheduler.update();
scheduler.handle(updateEvent);
Resource amResource1 = Resource.newInstance(1024, 1);
Resource amResource2 = Resource.newInstance(2048, 2);
@ -2635,24 +2643,32 @@ public void testQueueMaxAMShareDefault() throws Exception {
FSLeafQueue queue1 =
scheduler.getQueueManager().getLeafQueue("queue1", true);
assertEquals("Queue queue1's fair share should be 1366",
1366, queue1.getFairShare().getMemory());
assertEquals("Queue queue1's fair share should be 0", 0, queue1
.getFairShare().getMemory());
FSLeafQueue queue2 =
scheduler.getQueueManager().getLeafQueue("queue2", true);
assertEquals("Queue queue2's fair share should be 1366",
1366, queue2.getFairShare().getMemory());
assertEquals("Queue queue2's fair share should be 0", 0, queue2
.getFairShare().getMemory());
FSLeafQueue queue3 =
scheduler.getQueueManager().getLeafQueue("queue3", true);
assertEquals("Queue queue3's fair share should be 1366",
1366, queue3.getFairShare().getMemory());
assertEquals("Queue queue3's fair share should be 0", 0, queue3
.getFairShare().getMemory());
FSLeafQueue queue4 =
scheduler.getQueueManager().getLeafQueue("queue4", true);
assertEquals("Queue queue4's fair share should be 1366",
1366, queue4.getFairShare().getMemory());
assertEquals("Queue queue4's fair share should be 0", 0, queue4
.getFairShare().getMemory());
FSLeafQueue queue5 =
scheduler.getQueueManager().getLeafQueue("queue5", true);
assertEquals("Queue queue5's fair share should be 1366",
1366, queue5.getFairShare().getMemory());
assertEquals("Queue queue5's fair share should be 0", 0, queue5
.getFairShare().getMemory());
List<String> queues = Arrays.asList("root.default", "root.queue3",
"root.queue4", "root.queue5");
for (String queue : queues) {
createSchedulingRequest(1 * 1024, queue, "user1");
scheduler.update();
scheduler.handle(updateEvent);
}
Resource amResource1 = Resource.newInstance(2048, 1);
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();

View File

@ -0,0 +1,308 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFairSchedulerFairShare.class.getName() + ".xml").getAbsolutePath();
@Before
public void setup() throws IOException {
conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
}
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
private void createClusterWithQueuesAndOneNode(int mem, String policy)
throws IOException {
createClusterWithQueuesAndOneNode(mem, 0, policy);
}
private void createClusterWithQueuesAndOneNode(int mem, int vCores,
String policy) throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\" >");
out.println(" <queue name=\"parentA\" >");
out.println(" <weight>8</weight>");
out.println(" <queue name=\"childA1\" />");
out.println(" <queue name=\"childA2\" />");
out.println(" <queue name=\"childA3\" />");
out.println(" <queue name=\"childA4\" />");
out.println(" </queue>");
out.println(" <queue name=\"parentB\" >");
out.println(" <weight>1</weight>");
out.println(" <queue name=\"childB1\" />");
out.println(" <queue name=\"childB2\" />");
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>" + policy
+ "</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(mem, vCores), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
}
@Test
public void testFairShareNoAppsRunning() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
scheduler.update();
// No apps are running in the cluster,verify if fair share is zero
// for all queues under parentA and parentB.
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues();
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
* 100, 0);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
* 100, 0.1);
}
}
}
@Test
public void testFairShareOneAppRunning() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run a app in a childA1. Verify whether fair share is 100% in childA1,
// since it is the only active queue.
// Also verify if fair share is 0 for childA2. since no app is
// running in it.
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
scheduler.update();
assertEquals(
100,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA1", false).getFairShare()
.getMemory()
/ nodeCapacity * 100, 0.1);
assertEquals(
0,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemory()
/ nodeCapacity * 100, 0.1);
}
@Test
public void testFairShareMultipleActiveQueuesUnderSameParent()
throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2,childA3
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(2 * 1024, "root.parentA.childA2", "user2");
createSchedulingRequest(2 * 1024, "root.parentA.childA3", "user3");
scheduler.update();
// Verify if fair share is 100 / 3 = 33%
for (int i = 1; i <= 3; i++) {
assertEquals(
33,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemory()
/ nodeCapacity * 100, .9);
}
}
@Test
public void testFairShareMultipleActiveQueuesUnderDifferentParent()
throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2 which are under parentA
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
// Run app in childB1 which is under parentB
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
// Run app in root.default queue
createSchedulingRequest(1 * 1024, "root.default", "user4");
scheduler.update();
// The two active child queues under parentA would
// get fair share of 80/2=40%
for (int i = 1; i <= 2; i++) {
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemory()
/ nodeCapacity * 100, .9);
}
// The child queue under parentB would get a fair share of 10%,
// basically all of parentB's fair share
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getMemory()
/ nodeCapacity * 100, .9);
}
@Test
public void testFairShareResetsToZeroWhenAppsComplete() throws IOException {
int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
// Run apps in childA1,childA2 which are under parentA
ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024,
"root.parentA.childA1", "user1");
ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024,
"root.parentA.childA2", "user2");
scheduler.update();
// Verify if both the active queues under parentA get 50% fair
// share
for (int i = 1; i <= 2; i++) {
assertEquals(
50,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemory()
/ nodeCapacity * 100, .9);
}
// Let app under childA1 complete. This should cause the fair share
// of queue childA1 to be reset to zero,since the queue has no apps running.
// Queue childA2's fair share would increase to 100% since its the only
// active queue.
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
app1, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
assertEquals(
0,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA1", false).getFairShare()
.getMemory()
/ nodeCapacity * 100, 0);
assertEquals(
100,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemory()
/ nodeCapacity * 100, 0.1);
}
@Test
public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
throws IOException {
int nodeMem = 16 * 1024;
int nodeVCores = 10;
createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf");
// Run apps in childA1,childA2 which are under parentA
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
// Run app in childB1 which is under parentB
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
// Run app in root.default queue
createSchedulingRequest(1 * 1024, "root.default", "user4");
scheduler.update();
// The two active child queues under parentA would
// get 80/2=40% memory and vcores
for (int i = 1; i <= 2; i++) {
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getMemory()
/ nodeMem * 100, .9);
assertEquals(
40,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
.getVirtualCores()
/ nodeVCores * 100, .9);
}
// The only active child queue under parentB would get 10% memory and vcores
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getMemory()
/ nodeMem * 100, .9);
assertEquals(
10,
(double) scheduler.getQueueManager()
.getLeafQueue("root.parentB.childB1", false).getFairShare()
.getVirtualCores()
/ nodeVCores * 100, .9);
}
}