YARN-2026. Fair scheduler: Consider only active queues for computing fairshare. (Ashwin Shankar via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4365c4530b
commit
a6611f2bd1
|
@ -82,6 +82,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2212. ApplicationMaster needs to find a way to update the AMRMToken
|
||||
periodically. (xgong)
|
||||
|
||||
YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
|
||||
(Ashwin Shankar via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -116,6 +116,18 @@ public abstract class Schedulable {
|
|||
return fairShare;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if queue has atleast one app running. Always returns true for
|
||||
* AppSchedulables.
|
||||
*/
|
||||
public boolean isActive() {
|
||||
if (this instanceof FSQueue) {
|
||||
FSQueue queue = (FSQueue) this;
|
||||
return queue.getNumRunnableApps() > 0;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Convenient toString implementation for debugging. */
|
||||
@Override
|
||||
public String toString() {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
|||
public class ComputeFairShares {
|
||||
|
||||
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
||||
|
||||
|
||||
/**
|
||||
* Compute fair share of the given schedulables.Fair share is an allocation of
|
||||
* shares considering only active schedulables ie schedulables which have
|
||||
* running apps.
|
||||
*
|
||||
* @param schedulables
|
||||
* @param totalResources
|
||||
* @param type
|
||||
*/
|
||||
public static void computeShares(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type) {
|
||||
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
|
||||
for (Schedulable sched : schedulables) {
|
||||
if (sched.isActive()) {
|
||||
activeSchedulables.add(sched);
|
||||
} else {
|
||||
setResourceValue(0, sched.getFairShare(), type);
|
||||
}
|
||||
}
|
||||
|
||||
computeSharesInternal(activeSchedulables, totalResources, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a set of Schedulables and a number of slots, compute their weighted
|
||||
* fair shares. The min and max shares and of the Schedulables are assumed to
|
||||
|
@ -75,7 +100,7 @@ public class ComputeFairShares {
|
|||
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
|
||||
* iterations of binary search is a constant (dependent on desired precision).
|
||||
*/
|
||||
public static void computeShares(
|
||||
private static void computeSharesInternal(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type) {
|
||||
if (schedulables.isEmpty()) {
|
||||
|
|
|
@ -292,6 +292,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
// Have two queues which want entire cluster capacity
|
||||
createSchedulingRequest(10 * 1024, "queue1", "user1");
|
||||
createSchedulingRequest(10 * 1024, "queue2", "user1");
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
|
@ -322,6 +323,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
// Have two queues which want entire cluster capacity
|
||||
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
|
||||
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
|
@ -766,8 +768,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// user1,user2 submit their apps to parentq and create user queues
|
||||
scheduler.assignToQueue(rmApp1, "root.parentq", "user1");
|
||||
scheduler.assignToQueue(rmApp2, "root.parentq", "user2");
|
||||
createSchedulingRequest(10 * 1024, "root.parentq", "user1");
|
||||
createSchedulingRequest(10 * 1024, "root.parentq", "user2");
|
||||
// user3 submits app in default queue
|
||||
createSchedulingRequest(10 * 1024, "root.default", "user3");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
|
@ -1287,7 +1291,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.update();
|
||||
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
|
||||
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
||||
assertEquals(2980, toPreempt.getMemory());
|
||||
assertEquals(3277, toPreempt.getMemory());
|
||||
|
||||
// verify if the 3 containers required by queueA2 are preempted in the same
|
||||
// round
|
||||
|
@ -2446,8 +2450,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.update();
|
||||
|
||||
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
|
||||
assertEquals("Queue queue1's fair share should be 10240",
|
||||
10240, queue1.getFairShare().getMemory());
|
||||
assertEquals("Queue queue1's fair share should be 0", 0, queue1
|
||||
.getFairShare().getMemory());
|
||||
|
||||
createSchedulingRequest(1 * 1024, "root.default", "user1");
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
Resource amResource1 = Resource.newInstance(1024, 1);
|
||||
Resource amResource2 = Resource.newInstance(2048, 2);
|
||||
|
@ -2635,24 +2643,32 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
|
||||
FSLeafQueue queue1 =
|
||||
scheduler.getQueueManager().getLeafQueue("queue1", true);
|
||||
assertEquals("Queue queue1's fair share should be 1366",
|
||||
1366, queue1.getFairShare().getMemory());
|
||||
assertEquals("Queue queue1's fair share should be 0", 0, queue1
|
||||
.getFairShare().getMemory());
|
||||
FSLeafQueue queue2 =
|
||||
scheduler.getQueueManager().getLeafQueue("queue2", true);
|
||||
assertEquals("Queue queue2's fair share should be 1366",
|
||||
1366, queue2.getFairShare().getMemory());
|
||||
assertEquals("Queue queue2's fair share should be 0", 0, queue2
|
||||
.getFairShare().getMemory());
|
||||
FSLeafQueue queue3 =
|
||||
scheduler.getQueueManager().getLeafQueue("queue3", true);
|
||||
assertEquals("Queue queue3's fair share should be 1366",
|
||||
1366, queue3.getFairShare().getMemory());
|
||||
assertEquals("Queue queue3's fair share should be 0", 0, queue3
|
||||
.getFairShare().getMemory());
|
||||
FSLeafQueue queue4 =
|
||||
scheduler.getQueueManager().getLeafQueue("queue4", true);
|
||||
assertEquals("Queue queue4's fair share should be 1366",
|
||||
1366, queue4.getFairShare().getMemory());
|
||||
assertEquals("Queue queue4's fair share should be 0", 0, queue4
|
||||
.getFairShare().getMemory());
|
||||
FSLeafQueue queue5 =
|
||||
scheduler.getQueueManager().getLeafQueue("queue5", true);
|
||||
assertEquals("Queue queue5's fair share should be 1366",
|
||||
1366, queue5.getFairShare().getMemory());
|
||||
assertEquals("Queue queue5's fair share should be 0", 0, queue5
|
||||
.getFairShare().getMemory());
|
||||
|
||||
List<String> queues = Arrays.asList("root.default", "root.queue3",
|
||||
"root.queue4", "root.queue5");
|
||||
for (String queue : queues) {
|
||||
createSchedulingRequest(1 * 1024, queue, "user1");
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
}
|
||||
|
||||
Resource amResource1 = Resource.newInstance(2048, 1);
|
||||
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
|
||||
|
|
|
@ -0,0 +1,308 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
|
||||
private final static String ALLOC_FILE = new File(TEST_DIR,
|
||||
TestFairSchedulerFairShare.class.getName() + ".xml").getAbsolutePath();
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
conf = createConfiguration();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
if (resourceManager != null) {
|
||||
resourceManager.stop();
|
||||
resourceManager = null;
|
||||
}
|
||||
conf = null;
|
||||
}
|
||||
|
||||
private void createClusterWithQueuesAndOneNode(int mem, String policy)
|
||||
throws IOException {
|
||||
createClusterWithQueuesAndOneNode(mem, 0, policy);
|
||||
}
|
||||
|
||||
private void createClusterWithQueuesAndOneNode(int mem, int vCores,
|
||||
String policy) throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"root\" >");
|
||||
out.println(" <queue name=\"parentA\" >");
|
||||
out.println(" <weight>8</weight>");
|
||||
out.println(" <queue name=\"childA1\" />");
|
||||
out.println(" <queue name=\"childA2\" />");
|
||||
out.println(" <queue name=\"childA3\" />");
|
||||
out.println(" <queue name=\"childA4\" />");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"parentB\" >");
|
||||
out.println(" <weight>1</weight>");
|
||||
out.println(" <queue name=\"childB1\" />");
|
||||
out.println(" <queue name=\"childB2\" />");
|
||||
out.println(" </queue>");
|
||||
out.println("</queue>");
|
||||
out.println("<defaultQueueSchedulingPolicy>" + policy
|
||||
+ "</defaultQueueSchedulingPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
resourceManager = new MockRM(conf);
|
||||
resourceManager.start();
|
||||
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(1,
|
||||
Resources.createResource(mem, vCores), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareNoAppsRunning() throws IOException {
|
||||
int nodeCapacity = 16 * 1024;
|
||||
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
|
||||
|
||||
scheduler.update();
|
||||
// No apps are running in the cluster,verify if fair share is zero
|
||||
// for all queues under parentA and parentB.
|
||||
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
|
||||
.getLeafQueues();
|
||||
|
||||
for (FSLeafQueue leaf : leafQueues) {
|
||||
if (leaf.getName().startsWith("root.parentA")) {
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
|
||||
* 100, 0);
|
||||
} else if (leaf.getName().startsWith("root.parentB")) {
|
||||
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity
|
||||
* 100, 0.1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareOneAppRunning() throws IOException {
|
||||
int nodeCapacity = 16 * 1024;
|
||||
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
|
||||
|
||||
// Run a app in a childA1. Verify whether fair share is 100% in childA1,
|
||||
// since it is the only active queue.
|
||||
// Also verify if fair share is 0 for childA2. since no app is
|
||||
// running in it.
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
assertEquals(
|
||||
100,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
assertEquals(
|
||||
0,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA2", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareMultipleActiveQueuesUnderSameParent()
|
||||
throws IOException {
|
||||
int nodeCapacity = 16 * 1024;
|
||||
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
|
||||
|
||||
// Run apps in childA1,childA2,childA3
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA2", "user2");
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA3", "user3");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Verify if fair share is 100 / 3 = 33%
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
assertEquals(
|
||||
33,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareMultipleActiveQueuesUnderDifferentParent()
|
||||
throws IOException {
|
||||
int nodeCapacity = 16 * 1024;
|
||||
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
|
||||
|
||||
// Run apps in childA1,childA2 which are under parentA
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
|
||||
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
|
||||
|
||||
// Run app in childB1 which is under parentB
|
||||
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
|
||||
|
||||
// Run app in root.default queue
|
||||
createSchedulingRequest(1 * 1024, "root.default", "user4");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// The two active child queues under parentA would
|
||||
// get fair share of 80/2=40%
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
assertEquals(
|
||||
40,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
}
|
||||
|
||||
// The child queue under parentB would get a fair share of 10%,
|
||||
// basically all of parentB's fair share
|
||||
assertEquals(
|
||||
10,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentB.childB1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareResetsToZeroWhenAppsComplete() throws IOException {
|
||||
int nodeCapacity = 16 * 1024;
|
||||
createClusterWithQueuesAndOneNode(nodeCapacity, "fair");
|
||||
|
||||
// Run apps in childA1,childA2 which are under parentA
|
||||
ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024,
|
||||
"root.parentA.childA1", "user1");
|
||||
ApplicationAttemptId app2 = createSchedulingRequest(3 * 1024,
|
||||
"root.parentA.childA2", "user2");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Verify if both the active queues under parentA get 50% fair
|
||||
// share
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
assertEquals(
|
||||
50,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, .9);
|
||||
}
|
||||
// Let app under childA1 complete. This should cause the fair share
|
||||
// of queue childA1 to be reset to zero,since the queue has no apps running.
|
||||
// Queue childA2's fair share would increase to 100% since its the only
|
||||
// active queue.
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
||||
app1, RMAppAttemptState.FINISHED, false);
|
||||
|
||||
scheduler.handle(appRemovedEvent1);
|
||||
scheduler.update();
|
||||
|
||||
assertEquals(
|
||||
0,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0);
|
||||
assertEquals(
|
||||
100,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA2", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeCapacity * 100, 0.1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent()
|
||||
throws IOException {
|
||||
int nodeMem = 16 * 1024;
|
||||
int nodeVCores = 10;
|
||||
createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf");
|
||||
|
||||
// Run apps in childA1,childA2 which are under parentA
|
||||
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
|
||||
createSchedulingRequest(3 * 1024, "root.parentA.childA2", "user2");
|
||||
|
||||
// Run app in childB1 which is under parentB
|
||||
createSchedulingRequest(1 * 1024, "root.parentB.childB1", "user3");
|
||||
|
||||
// Run app in root.default queue
|
||||
createSchedulingRequest(1 * 1024, "root.default", "user4");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// The two active child queues under parentA would
|
||||
// get 80/2=40% memory and vcores
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
assertEquals(
|
||||
40,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeMem * 100, .9);
|
||||
assertEquals(
|
||||
40,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentA.childA" + i, false).getFairShare()
|
||||
.getVirtualCores()
|
||||
/ nodeVCores * 100, .9);
|
||||
}
|
||||
|
||||
// The only active child queue under parentB would get 10% memory and vcores
|
||||
assertEquals(
|
||||
10,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentB.childB1", false).getFairShare()
|
||||
.getMemory()
|
||||
/ nodeMem * 100, .9);
|
||||
assertEquals(
|
||||
10,
|
||||
(double) scheduler.getQueueManager()
|
||||
.getLeafQueue("root.parentB.childB1", false).getFairShare()
|
||||
.getVirtualCores()
|
||||
/ nodeVCores * 100, .9);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue