YARN-2393. FairScheduler: Add the notion of steady fair share. (Wei Yan via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619852 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-08-22 16:00:57 +00:00
parent 7ca74a025e
commit 21843592db
16 changed files with 328 additions and 47 deletions

View File

@ -35,6 +35,9 @@ Release 2.6.0 - UNRELEASED
YARN-2174. Enable HTTPs for the writer REST API of TimelineServer. YARN-2174. Enable HTTPs for the writer REST API of TimelineServer.
(Zhijie Shen via jianhe) (Zhijie Shen via jianhe)
YARN-2393. FairScheduler: Add the notion of steady fair share.
(Wei Yan via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -717,12 +717,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
this.fairShare = fairShare; this.fairShare = fairShare;
} }
@Override
public boolean isActive() {
return true;
}
@Override @Override
public void updateDemand() { public void updateDemand() {
demand = Resources.createResource(0); demand = Resources.createResource(0);

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -68,6 +67,16 @@ public class FSParentQueue extends FSQueue {
} }
} }
public void recomputeSteadyShares() {
policy.computeSteadyShares(childQueues, getSteadyFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
if (childQueue instanceof FSParentQueue) {
((FSParentQueue) childQueue).recomputeSteadyShares();
}
}
}
@Override @Override
public Resource getDemand() { public Resource getDemand() {
return demand; return demand;

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Unstable @Unstable
public abstract class FSQueue implements Queue, Schedulable { public abstract class FSQueue implements Queue, Schedulable {
private Resource fairShare = Resources.createResource(0, 0); private Resource fairShare = Resources.createResource(0, 0);
private Resource steadyFairShare = Resources.createResource(0, 0);
private final String name; private final String name;
protected final FairScheduler scheduler; protected final FairScheduler scheduler;
private final FSQueueMetrics metrics; private final FSQueueMetrics metrics;
@ -152,6 +153,16 @@ public abstract class FSQueue implements Queue, Schedulable {
metrics.setFairShare(fairShare); metrics.setFairShare(fairShare);
} }
/** Get the steady fair share assigned to this Schedulable. */
public Resource getSteadyFairShare() {
return steadyFairShare;
}
public void setSteadyFairShare(Resource steadyFairShare) {
this.steadyFairShare = steadyFairShare;
metrics.setSteadyFairShare(steadyFairShare);
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
} }
@ -194,7 +205,9 @@ public abstract class FSQueue implements Queue, Schedulable {
return true; return true;
} }
@Override /**
* Returns true if queue has at least one app running.
*/
public boolean isActive() { public boolean isActive() {
return getNumRunnableApps() > 0; return getNumRunnableApps() > 0;
} }

View File

@ -33,6 +33,8 @@ public class FSQueueMetrics extends QueueMetrics {
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB; @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores; @Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
@Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB;
@Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB; @Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores; @Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB; @Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
@ -56,6 +58,19 @@ public class FSQueueMetrics extends QueueMetrics {
return fairShareVCores.value(); return fairShareVCores.value();
} }
public void setSteadyFairShare(Resource resource) {
steadyFairShareMB.set(resource.getMemory());
steadyFairShareVCores.set(resource.getVirtualCores());
}
public int getSteadyFairShareMB() {
return steadyFairShareMB.value();
}
public int getSteadyFairShareVCores() {
return steadyFairShareVCores.value();
}
public void setMinShare(Resource resource) { public void setMinShare(Resource resource) {
minShareMB.set(resource.getMemory()); minShareMB.set(resource.getMemory());
minShareVCores.set(resource.getVirtualCores()); minShareVCores.set(resource.getVirtualCores());

View File

@ -851,6 +851,8 @@ public class FairScheduler extends
Resources.addTo(clusterResource, node.getTotalCapability()); Resources.addTo(clusterResource, node.getTotalCapability());
updateRootQueueMetrics(); updateRootQueueMetrics();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Added node " + node.getNodeAddress() + LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterResource); " cluster capacity: " + clusterResource);
} }
@ -885,6 +887,8 @@ public class FairScheduler extends
} }
nodes.remove(rmNode.getNodeID()); nodes.remove(rmNode.getNodeID());
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
LOG.info("Removed node " + rmNode.getNodeAddress() + LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterResource); " cluster capacity: " + clusterResource);
} }

View File

@ -118,6 +118,11 @@ public class QueueManager {
if (queue == null && create) { if (queue == null && create) {
// if the queue doesn't exist,create it and return // if the queue doesn't exist,create it and return
queue = createQueue(name, queueType); queue = createQueue(name, queueType);
// Update steady fair share for all queues
if (queue != null) {
rootQueue.recomputeSteadyShares();
}
} }
return queue; return queue;
} }
@ -376,5 +381,8 @@ public class QueueManager {
+ queue.getName(), ex); + queue.getName(), ex);
} }
} }
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
} }
} }

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* A Schedulable represents an entity that can be scheduled such as an * A Schedulable represents an entity that can be scheduled such as an
@ -102,10 +101,4 @@ public interface Schedulable {
/** Assign a fair share to this Schedulable. */ /** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare); public void setFairShare(Resource fairShare);
/**
* Returns true if queue has atleast one app running. Always returns true for
* AppSchedulables.
*/
public boolean isActive();
} }

View File

@ -17,10 +17,6 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -29,6 +25,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
@Public @Public
@Evolving @Evolving
public abstract class SchedulingPolicy { public abstract class SchedulingPolicy {
@ -131,8 +131,10 @@ public abstract class SchedulingPolicy {
public abstract Comparator<Schedulable> getComparator(); public abstract Comparator<Schedulable> getComparator();
/** /**
* Computes and updates the shares of {@link Schedulable}s as per the * Computes and updates the shares of {@link Schedulable}s as per
* {@link SchedulingPolicy}, to be used later at schedule time. * the {@link SchedulingPolicy}, to be used later for scheduling decisions.
* The shares computed are instantaneous and only consider queues with
* running applications.
* *
* @param schedulables {@link Schedulable}s whose shares are to be updated * @param schedulables {@link Schedulable}s whose shares are to be updated
* @param totalResources Total {@link Resource}s in the cluster * @param totalResources Total {@link Resource}s in the cluster
@ -140,6 +142,19 @@ public abstract class SchedulingPolicy {
public abstract void computeShares( public abstract void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources); Collection<? extends Schedulable> schedulables, Resource totalResources);
/**
* Computes and updates the steady shares of {@link FSQueue}s as per the
* {@link SchedulingPolicy}. The steady share does not differentiate
* between queues with and without running applications under them. The
* steady share is not used for scheduling, it is displayed on the Web UI
* for better visibility.
*
* @param queues {@link FSQueue}s whose shares are to be updated
* @param totalResources Total {@link Resource}s in the cluster
*/
public abstract void computeSteadyShares(
Collection<? extends FSQueue> queues, Resource totalResources);
/** /**
* Check if the resource usage is over the fair share under this policy * Check if the resource usage is over the fair share under this policy
* *

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
/** /**
@ -49,14 +50,29 @@ public class ComputeFairShares {
ResourceType type) { ResourceType type) {
Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>(); Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
for (Schedulable sched : schedulables) { for (Schedulable sched : schedulables) {
if (sched.isActive()) { if ((sched instanceof FSQueue) && !((FSQueue) sched).isActive()) {
activeSchedulables.add(sched);
} else {
setResourceValue(0, sched.getFairShare(), type); setResourceValue(0, sched.getFairShare(), type);
} else {
activeSchedulables.add(sched);
} }
} }
computeSharesInternal(activeSchedulables, totalResources, type); computeSharesInternal(activeSchedulables, totalResources, type, false);
}
/**
* Compute the steady fair share of the given queues. The steady fair
* share is an allocation of shares considering all queues, i.e.,
* active and inactive.
*
* @param queues
* @param totalResources
* @param type
*/
public static void computeSteadyShares(
Collection<? extends FSQueue> queues, Resource totalResources,
ResourceType type) {
computeSharesInternal(queues, totalResources, type, true);
} }
/** /**
@ -102,7 +118,7 @@ public class ComputeFairShares {
*/ */
private static void computeSharesInternal( private static void computeSharesInternal(
Collection<? extends Schedulable> schedulables, Resource totalResources, Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) { ResourceType type, boolean isSteadyShare) {
if (schedulables.isEmpty()) { if (schedulables.isEmpty()) {
return; return;
} }
@ -145,7 +161,13 @@ public class ComputeFairShares {
} }
// Set the fair shares based on the value of R we've converged to // Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) { for (Schedulable sched : schedulables) {
setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type); if (isSteadyShare) {
setResourceValue(computeShare(sched, right, type),
((FSQueue) sched).getSteadyFairShare(), type);
} else {
setResourceValue(
computeShare(sched, right, type), sched.getFairShare(), type);
}
} }
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -69,6 +70,14 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
} }
} }
@Override
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
for (ResourceType type : ResourceType.values()) {
ComputeFairShares.computeSteadyShares(queues, totalResources, type);
}
}
@Override @Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare); return !Resources.fitsIn(usage, fairShare);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -119,6 +120,13 @@ public class FairSharePolicy extends SchedulingPolicy {
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
} }
@Override
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
ComputeFairShares.computeSteadyShares(queues, totalResources,
ResourceType.MEMORY);
}
@Override @Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);

View File

@ -24,6 +24,7 @@ import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -87,6 +88,13 @@ public class FifoPolicy extends SchedulingPolicy {
earliest.setFairShare(Resources.clone(totalResources)); earliest.setFairShare(Resources.clone(totalResources));
} }
@Override
public void computeSteadyShares(Collection<? extends FSQueue> queues,
Resource totalResources) {
// Nothing needs to do, as leaf queue doesn't have to calculate steady
// fair shares for applications.
}
@Override @Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(

View File

@ -100,11 +100,6 @@ public class FakeSchedulable implements Schedulable {
this.fairShare = fairShare; this.fairShare = fairShare;
} }
@Override
public boolean isActive() {
return true;
}
@Override @Override
public Resource getDemand() { public Resource getDemand() {
return null; return null;

View File

@ -292,14 +292,19 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createSchedulingRequest(10 * 1024, "root.default", "user1"); createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update(); scheduler.update();
scheduler.getQueueManager().getRootQueue()
.setSteadyFairShare(scheduler.getClusterResource());
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size()); assertEquals(3, queues.size());
// Divided three ways - betwen the two queues and the default queue // Divided three ways - between the two queues and the default queue
for (FSLeafQueue p : queues) { for (FSLeafQueue p : queues) {
assertEquals(3414, p.getFairShare().getMemory()); assertEquals(3414, p.getFairShare().getMemory());
assertEquals(3414, p.getMetrics().getFairShareMB()); assertEquals(3414, p.getMetrics().getFairShareMB());
assertEquals(3414, p.getSteadyFairShare().getMemory());
assertEquals(3414, p.getMetrics().getSteadyFairShareMB());
} }
} }
@ -323,6 +328,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createSchedulingRequest(10 * 1024, "root.default", "user1"); createSchedulingRequest(10 * 1024, "root.default", "user1");
scheduler.update(); scheduler.update();
scheduler.getQueueManager().getRootQueue()
.setSteadyFairShare(scheduler.getClusterResource());
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
QueueManager queueManager = scheduler.getQueueManager(); QueueManager queueManager = scheduler.getQueueManager();
Collection<FSLeafQueue> queues = queueManager.getLeafQueues(); Collection<FSLeafQueue> queues = queueManager.getLeafQueues();
@ -333,10 +341,16 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3", true); FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3", true);
assertEquals(capacity / 2, queue1.getFairShare().getMemory()); assertEquals(capacity / 2, queue1.getFairShare().getMemory());
assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB()); assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB());
assertEquals(capacity / 2, queue1.getSteadyFairShare().getMemory());
assertEquals(capacity / 2, queue1.getMetrics().getSteadyFairShareMB());
assertEquals(capacity / 4, queue2.getFairShare().getMemory()); assertEquals(capacity / 4, queue2.getFairShare().getMemory());
assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB()); assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB());
assertEquals(capacity / 4, queue2.getSteadyFairShare().getMemory());
assertEquals(capacity / 4, queue2.getMetrics().getSteadyFairShareMB());
assertEquals(capacity / 4, queue3.getFairShare().getMemory()); assertEquals(capacity / 4, queue3.getFairShare().getMemory());
assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB()); assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB());
assertEquals(capacity / 4, queue3.getSteadyFairShare().getMemory());
assertEquals(capacity / 4, queue3.getMetrics().getSteadyFairShareMB());
} }
@Test @Test
@ -771,6 +785,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
createSchedulingRequest(10 * 1024, "root.default", "user3"); createSchedulingRequest(10 * 1024, "root.default", "user3");
scheduler.update(); scheduler.update();
scheduler.getQueueManager().getRootQueue()
.setSteadyFairShare(scheduler.getClusterResource());
scheduler.getQueueManager().getRootQueue().recomputeSteadyShares();
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager() Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues(); .getLeafQueues();
@ -780,12 +797,128 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|| leaf.getName().equals("root.parentq.user2")) { || leaf.getName().equals("root.parentq.user2")) {
// assert that the fair share is 1/4th node1's capacity // assert that the fair share is 1/4th node1's capacity
assertEquals(capacity / 4, leaf.getFairShare().getMemory()); assertEquals(capacity / 4, leaf.getFairShare().getMemory());
// assert that the steady fair share is 1/4th node1's capacity
assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemory());
// assert weights are equal for both the user queues // assert weights are equal for both the user queues
assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
} }
} }
} }
@Test
public void testSteadyFairShareWithReloadAndNodeAddRemove() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
out.println(" <queue name=\"child1\">");
out.println(" <weight>1</weight>");
out.println(" </queue>");
out.println(" <queue name=\"child2\">");
out.println(" <weight>1</weight>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// The steady fair share for all queues should be 0
QueueManager queueManager = scheduler.getQueueManager();
assertEquals(0, queueManager.getLeafQueue("child1", false)
.getSteadyFairShare().getMemory());
assertEquals(0, queueManager.getLeafQueue("child2", false)
.getSteadyFairShare().getMemory());
// Add one node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
assertEquals(6144, scheduler.getClusterResource().getMemory());
// The steady fair shares for all queues should be updated
assertEquals(2048, queueManager.getLeafQueue("child1", false)
.getSteadyFairShare().getMemory());
assertEquals(2048, queueManager.getLeafQueue("child2", false)
.getSteadyFairShare().getMemory());
// Reload the allocation configuration file
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
out.println(" <queue name=\"child1\">");
out.println(" <weight>1</weight>");
out.println(" </queue>");
out.println(" <queue name=\"child2\">");
out.println(" <weight>2</weight>");
out.println(" </queue>");
out.println(" <queue name=\"child3\">");
out.println(" <weight>2</weight>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// The steady fair shares for all queues should be updated
assertEquals(1024, queueManager.getLeafQueue("child1", false)
.getSteadyFairShare().getMemory());
assertEquals(2048, queueManager.getLeafQueue("child2", false)
.getSteadyFairShare().getMemory());
assertEquals(2048, queueManager.getLeafQueue("child3", false)
.getSteadyFairShare().getMemory());
// Remove the node, steady fair shares should back to 0
NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
assertEquals(0, scheduler.getClusterResource().getMemory());
assertEquals(0, queueManager.getLeafQueue("child1", false)
.getSteadyFairShare().getMemory());
assertEquals(0, queueManager.getLeafQueue("child2", false)
.getSteadyFairShare().getMemory());
}
@Test
public void testSteadyFairShareWithQueueCreatedRuntime() throws Exception {
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
assertEquals(6144, scheduler.getClusterResource().getMemory());
assertEquals(6144, scheduler.getQueueManager().getRootQueue()
.getSteadyFairShare().getMemory());
assertEquals(6144, scheduler.getQueueManager()
.getLeafQueue("default", false).getSteadyFairShare().getMemory());
// Submit one application
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId1, "default", "user1", null);
assertEquals(3072, scheduler.getQueueManager()
.getLeafQueue("default", false).getSteadyFairShare().getMemory());
assertEquals(3072, scheduler.getQueueManager()
.getLeafQueue("user1", false).getSteadyFairShare().getMemory());
}
/** /**
* Make allocation requests and ensure they are reflected in queue demand. * Make allocation requests and ensure they are reflected in queue demand.
*/ */

View File

@ -109,13 +109,15 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
for (FSLeafQueue leaf : leafQueues) { for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) { if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity,
* 100, 0); 0);
} else if (leaf.getName().startsWith("root.parentB")) { } else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity,
* 100, 0.1); 0);
} }
} }
verifySteadyFairShareMemory(leafQueues, nodeCapacity);
} }
@Test @Test
@ -135,14 +137,15 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
100, 100,
(double) scheduler.getQueueManager() (double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA1", false).getFairShare() .getLeafQueue("root.parentA.childA1", false).getFairShare()
.getMemory() .getMemory() / nodeCapacity * 100, 0.1);
/ nodeCapacity * 100, 0.1);
assertEquals( assertEquals(
0, 0,
(double) scheduler.getQueueManager() (double) scheduler.getQueueManager()
.getLeafQueue("root.parentA.childA2", false).getFairShare() .getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemory() .getMemory() / nodeCapacity, 0.1);
/ nodeCapacity * 100, 0.1);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
} }
@Test @Test
@ -167,6 +170,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
.getMemory() .getMemory()
/ nodeCapacity * 100, .9); / nodeCapacity * 100, .9);
} }
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
} }
@Test @Test
@ -206,6 +212,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
.getLeafQueue("root.parentB.childB1", false).getFairShare() .getLeafQueue("root.parentB.childB1", false).getFairShare()
.getMemory() .getMemory()
/ nodeCapacity * 100, .9); / nodeCapacity * 100, .9);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
} }
@Test @Test
@ -253,6 +262,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
.getLeafQueue("root.parentA.childA2", false).getFairShare() .getLeafQueue("root.parentA.childA2", false).getFairShare()
.getMemory() .getMemory()
/ nodeCapacity * 100, 0.1); / nodeCapacity * 100, 0.1);
verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(),
nodeCapacity);
} }
@Test @Test
@ -304,5 +316,45 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
.getLeafQueue("root.parentB.childB1", false).getFairShare() .getLeafQueue("root.parentB.childB1", false).getFairShare()
.getVirtualCores() .getVirtualCores()
/ nodeVCores * 100, .9); / nodeVCores * 100, .9);
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues();
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001);
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
0.001);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0.05,
(double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001);
assertEquals(0.1,
(double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores,
0.001);
}
}
}
/**
* Verify whether steady fair shares for all leaf queues still follow
* their weight, not related to active/inactive status.
*
* @param leafQueues
* @param nodeCapacity
*/
private void verifySteadyFairShareMemory(Collection<FSLeafQueue> leafQueues,
int nodeCapacity) {
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().startsWith("root.parentA")) {
assertEquals(0.2,
(double) leaf.getSteadyFairShare().getMemory() / nodeCapacity,
0.001);
} else if (leaf.getName().startsWith("root.parentB")) {
assertEquals(0.05,
(double) leaf.getSteadyFairShare().getMemory() / nodeCapacity,
0.001);
}
}
} }
} }