YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-03 17:33:55 +00:00
parent 4c13c986e1
commit c1b635ed48
20 changed files with 892 additions and 169 deletions

View File

@ -108,6 +108,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-392. Make it possible to specify hard locality constraints in resource
requests. (sandyr via tucu)
YARN-326. Add multi-resource scheduling to the fair scheduler.
(sandyr via tucu)
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.resource;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Private
@Evolving
public enum ResourceType {
MEMORY, CPU
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.resource;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Private
@Evolving
public class ResourceWeights {
public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
private float[] weights = new float[ResourceType.values().length];
public ResourceWeights(float memoryWeight, float cpuWeight) {
weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
weights[ResourceType.CPU.ordinal()] = cpuWeight;
}
public ResourceWeights(float weight) {
for (int i = 0; i < weights.length; i++) {
weights[i] = weight;
}
}
public ResourceWeights() { }
public void setWeight(ResourceType resourceType, float weight) {
weights[resourceType.ordinal()] = weight;
}
public float getWeight(ResourceType resourceType) {
return weights[resourceType.ordinal()];
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("<");
for (int i = 0; i < ResourceType.values().length; i++) {
if (i != 0) {
sb.append(", ");
}
ResourceType resourceType = ResourceType.values()[i];
sb.append(resourceType.name().toLowerCase());
sb.append(String.format(" weight=%.1f", getWeight(resourceType)));
}
sb.append(">");
return sb.toString();
}
}

View File

@ -32,4 +32,8 @@ public class AllocationConfigurationException extends Exception {
public AllocationConfigurationException(String message) {
super(message);
}
public AllocationConfigurationException(String message, Throwable t) {
super(message, t);
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -119,7 +120,7 @@ public class AppSchedulable extends Schedulable {
}
@Override
public double getWeight() {
public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@ -237,10 +238,7 @@ public class AppSchedulable extends Schedulable {
}
// Can we allocate a container on this node?
int availableContainers =
available.getMemory() / capability.getMemory();
if (availableContainers > 0) {
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, priority, request, container);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -84,7 +85,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
throws AllocationConfigurationException;
@Override
public double getWeight() {
public ResourceWeights getWeights() {
return queueMgr.getQueueWeight(getName());
}

View File

@ -180,8 +180,8 @@ public class FSSchedulerNode extends SchedulerNode {
@Override
public String toString() {
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
" available=" + getAvailableResource().getMemory() +
" used=" + getUsedResource().getMemory();
" available=" + getAvailableResource() +
" used=" + getUsedResource();
}
@Override

View File

@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -495,14 +497,14 @@ public class FairScheduler implements ResourceScheduler {
}
// synchronized for sizeBasedWeight
public synchronized double getAppWeight(AppSchedulable app) {
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
return ResourceWeights.NEUTRAL;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current demand
// Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
@ -510,7 +512,7 @@ public class FairScheduler implements ResourceScheduler {
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
return weight;
return new ResourceWeights((float)weight);
}
}
@ -714,37 +716,6 @@ public class FairScheduler implements ResourceScheduler {
" cluster capacity: " + clusterCapacity);
}
/**
* Utility method to normalize a list of resource requests, by ensuring that
* the memory for each request is a multiple of minMemory and is not zero.
*
* @param asks a list of resource requests
* @param minMemory the configured minimum memory allocation
* @param maxMemory the configured maximum memory allocation
*/
static void normalizeRequests(List<ResourceRequest> asks,
int minMemory, int maxMemory) {
for (ResourceRequest ask : asks) {
normalizeRequest(ask, minMemory, maxMemory);
}
}
/**
* Utility method to normalize a resource request, by ensuring that the
* requested memory is a multiple of minMemory and is not zero.
*
* @param ask the resource request
* @param minMemory the configured minimum memory allocation
* @param maxMemory the configured maximum memory allocation
*/
static void normalizeRequest(ResourceRequest ask, int minMemory,
int maxMemory) {
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
int normalizedMemory =
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
}
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
@ -758,8 +729,8 @@ public class FairScheduler implements ResourceScheduler {
}
// Sanity check
normalizeRequests(ask, minimumAllocation.getMemory(),
maximumAllocation.getMemory());
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
clusterCapacity, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
@ -1015,8 +986,8 @@ public class FairScheduler implements ResourceScheduler {
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
maximumAllocation = this.conf.getMaximumMemoryAllocation();
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@Evolving
@ -78,18 +82,24 @@ public class FairSchedulerConfiguration extends Configuration {
addResource(FS_CONFIGURATION_FILE);
}
public Resource getMinimumMemoryAllocation() {
public Resource getMinimumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
return Resources.createResource(mem);
int cpu = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
return Resources.createResource(mem, cpu);
}
public Resource getMaximumMemoryAllocation() {
public Resource getMaximumAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
return Resources.createResource(mem);
int cpu = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
return Resources.createResource(mem, cpu);
}
public boolean getUserAsDefaultQueue() {
@ -136,4 +146,34 @@ public class FairSchedulerConfiguration extends Configuration {
public int getWaitTimeBeforeKill() {
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
}
/**
* Parses a resource config value of a form like "1024", "1024 mb",
* or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
*
* @throws AllocationConfigurationException
*/
public static Resource parseResourceConfigValue(String val)
throws AllocationConfigurationException {
try {
int memory = findResource(val, "mb");
int vcores = findResource(val, "vcores");
return BuilderUtils.newResource(memory, vcores);
} catch (AllocationConfigurationException ex) {
throw ex;
} catch (Exception ex) {
throw new AllocationConfigurationException(
"Error reading resource config", ex);
}
}
private static int findResource(String val, String units)
throws AllocationConfigurationException {
Pattern pattern = Pattern.compile("(\\d+) ?" + units);
Matcher matcher = pattern.matcher(val);
if (!matcher.find()) {
throw new AllocationConfigurationException("Missing resource: " + units);
}
return Integer.parseInt(matcher.group(1));
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@ -301,7 +302,7 @@ public class QueueManager {
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Double> queueWeights = new HashMap<String, Double>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
@ -415,7 +416,7 @@ public class QueueManager {
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
@ -433,12 +434,12 @@ public class QueueManager {
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
minQueueResources.put(queueName, val);
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
maxQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@ -446,7 +447,7 @@ public class QueueManager {
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
queueWeights.put(queueName, new ResourceWeights((float)val));
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
@ -454,7 +455,9 @@ public class QueueManager {
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queuePolicies.put(queueName, SchedulingPolicy.parse(text));
SchedulingPolicy policy = SchedulingPolicy.parse(text);
policy.initialize(scheduler.getClusterCapacity());
queuePolicies.put(queueName, policy);
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@ -510,7 +513,7 @@ public class QueueManager {
}
/**
* Get a collection of all queues
* Get a collection of all leaf queues
*/
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
@ -518,6 +521,13 @@ public class QueueManager {
}
}
/**
* Get a collection of all queues
*/
public Collection<FSQueue> getQueues() {
return queues.values();
}
public int getUserMaxApps(String user) {
// save current info in case it gets changed under us
QueueManagerInfo info = this.info;
@ -538,12 +548,12 @@ public class QueueManager {
}
}
public double getQueueWeight(String queue) {
Double weight = info.queueWeights.get(queue);
public ResourceWeights getQueueWeight(String queue) {
ResourceWeights weight = info.queueWeights.get(queue);
if (weight != null) {
return weight;
} else {
return 1.0;
return ResourceWeights.NEUTRAL;
}
}
@ -595,7 +605,7 @@ public class QueueManager {
// Maximum amount of resources per queue
public final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue
public final Map<String, Double> queueWeights;
public final Map<String, ResourceWeights> queueWeights;
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
@ -625,7 +635,7 @@ public class QueueManager {
public QueueManagerInfo(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, Double> queueWeights, int userMaxAppsDefault,
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
@ -647,7 +657,7 @@ public class QueueManager {
public QueueManagerInfo() {
minQueueResources = new HashMap<String, Resource>();
maxQueueResources = new HashMap<String, Resource>();
queueWeights = new HashMap<String, Double>();
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
userMaxAppsDefault = Integer.MAX_VALUE;

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
/**
@ -80,7 +81,7 @@ public abstract class Schedulable {
/** Job/queue weight in fair sharing. */
public abstract double getWeight();
public abstract ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public abstract long getStartTime();
@ -110,7 +111,7 @@ public abstract class Schedulable {
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
/**
* Returns {@link SchedulingPolicy} instance corresponding to the
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
* FairsharePolicy or "fifo" for FifoPolicy. For custom
* FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
* DominantResourceFairnessPolicy. For a custom
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
* canonical class name of the {@link SchedulingPolicy}.
*
* @param policy canonical class name or "fair" or "fifo"
* @param policy canonical class name or "drf" or "fair" or "fifo"
* @throws AllocationConfigurationException
*/
@SuppressWarnings("unchecked")
@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
@SuppressWarnings("rawtypes")
Class clazz;
String text = policy.toLowerCase();
if (text.equals("fair")) {
if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
clazz = FairSharePolicy.class;
} else if (text.equals("fifo")) {
} else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
clazz = FifoPolicy.class;
} else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
clazz = DominantResourceFairnessPolicy.class;
} else {
try {
clazz = Class.forName(policy);
@ -99,6 +103,8 @@ public abstract class SchedulingPolicy {
return getInstance(clazz);
}
public void initialize(Resource clusterCapacity) {}
/**
* @return returns the name of {@link SchedulingPolicy}
*/

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.util.Collection;
import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
/**
* Makes scheduling decisions by trying to equalize dominant resource usage.
* A schedulable's dominant resource usage is the largest ratio of resource
* usage to capacity among the resource types it is using.
*/
@Private
@Unstable
public class DominantResourceFairnessPolicy extends SchedulingPolicy {
public static final String NAME = "DRF";
private DominantResourceFairnessComparator comparator =
new DominantResourceFairnessComparator();
@Override
public String getName() {
return NAME;
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;
}
@Override
public Comparator<Schedulable> getComparator() {
return comparator;
}
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
// TODO: For now, set all fair shares to 0, because, in the context of DRF,
// it doesn't make sense to set a value for each resource. YARN-736 should
// add in a sensible replacement.
for (Schedulable schedulable : schedulables) {
schedulable.setFairShare(Resources.none());
}
}
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
}
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
private static final int NUM_RESOURCES = ResourceType.values().length;
private Resource clusterCapacity;
public void setClusterCapacity(Resource clusterCapacity) {
this.clusterCapacity = clusterCapacity;
}
@Override
public int compare(Schedulable s1, Schedulable s2) {
ResourceWeights sharesOfCluster1 = new ResourceWeights();
ResourceWeights sharesOfCluster2 = new ResourceWeights();
ResourceWeights sharesOfMinShare1 = new ResourceWeights();
ResourceWeights sharesOfMinShare2 = new ResourceWeights();
ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
// Calculate shares of the cluster for each resource both schedulables.
calculateShares(s1.getResourceUsage(),
clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
calculateShares(s1.getResourceUsage(),
s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
calculateShares(s2.getResourceUsage(),
clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
calculateShares(s2.getResourceUsage(),
s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
// A queue is needy for its min share if its dominant resource
// (with respect to the cluster capacity) is below its configured min share
// for that resource
boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
int res = 0;
if (!s2Needy && !s1Needy) {
res = compareShares(sharesOfCluster1, sharesOfCluster2,
resourceOrder1, resourceOrder2);
} else if (s1Needy && !s2Needy) {
res = -1;
} else if (s2Needy && !s1Needy) {
res = 1;
} else { // both are needy below min share
res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
resourceOrder1, resourceOrder2);
}
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time.
res = (int)(s1.getStartTime() - s2.getStartTime());
}
return res;
}
/**
* Calculates and orders a resource's share of a pool in terms of two vectors.
* The shares vector contains, for each resource, the fraction of the pool that
* it takes up. The resourceOrder vector contains an ordering of resources
* by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
* shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
*/
void calculateShares(Resource resource, Resource pool,
ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
shares.setWeight(MEMORY, (float)resource.getMemory() /
(pool.getMemory() * weights.getWeight(MEMORY)));
shares.setWeight(CPU, (float)resource.getVirtualCores() /
(pool.getVirtualCores() * weights.getWeight(CPU)));
// sort order vector by resource share
if (resourceOrder != null) {
if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
resourceOrder[0] = MEMORY;
resourceOrder[1] = CPU;
} else {
resourceOrder[0] = CPU;
resourceOrder[1] = MEMORY;
}
}
}
private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
for (int i = 0; i < resourceOrder1.length; i++) {
int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
- shares2.getWeight(resourceOrder2[i]));
if (ret != 0) {
return ret;
}
}
return 0;
}
}
}

View File

@ -25,17 +25,21 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import com.google.common.annotations.VisibleForTesting;
/**
* Makes scheduling decisions by trying to equalize shares of memory.
*/
@Private
@Unstable
public class FairSharePolicy extends SchedulingPolicy {
@VisibleForTesting
public static final String NAME = "Fairshare";
public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
private FairShareComparator comparator = new FairShareComparator();
@ -79,8 +83,10 @@ public class FairSharePolicy extends SchedulingPolicy {
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
@ -220,7 +226,7 @@ public class FairSharePolicy extends SchedulingPolicy {
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
double share = sched.getWeight() * r2sRatio;
double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.util.Records;
@ -30,7 +31,7 @@ public class FakeSchedulable extends Schedulable {
private Resource demand;
private Resource usage;
private Resource minShare;
private double weight;
private ResourceWeights weights;
private Priority priority;
private long startTime;
@ -46,21 +47,22 @@ public class FakeSchedulable extends Schedulable {
this(demand, minShare, 1, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double weight) {
this(demand, minShare, weight, 0, 0, 0);
public FakeSchedulable(int demand, int minShare, double memoryWeight) {
this(demand, minShare, memoryWeight, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
long startTime) {
this(Resources.createResource(demand), Resources.createResource(minShare), weight,
Resources.createResource(fairShare), Resources.createResource(usage), startTime);
this(Resources.createResource(demand), Resources.createResource(minShare),
new ResourceWeights((float)weight), Resources.createResource(fairShare),
Resources.createResource(usage), startTime);
}
public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
Resource usage, long startTime) {
public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
Resource fairShare, Resource usage, long startTime) {
this.demand = demand;
this.minShare = minShare;
this.weight = weight;
this.weights = weight;
setFairShare(fairShare);
this.usage = usage;
this.priority = Records.newRecord(Priority.class);
@ -98,8 +100,8 @@ public class FakeSchedulable extends Schedulable {
}
@Override
public double getWeight() {
return weight;
public ResourceWeights getWeights() {
return weights;
}
@Override

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
@ -148,11 +149,16 @@ public class TestFairScheduler {
return attId;
}
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers, boolean relaxLocality) {
return createResourceRequest(memory, 1, host, priority, numContainers,
relaxLocality);
}
private ResourceRequest createResourceRequest(int memory, int vcores, String host,
int priority, int numContainers, boolean relaxLocality) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setCapability(BuilderUtils.newResource(memory, vcores));
request.setResourceName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
@ -171,17 +177,33 @@ public class TestFairScheduler {
return createSchedulingRequest(memory, queueId, userId, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
String queueId, String userId) {
return createSchedulingRequest(memory, vcores, queueId, userId, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers) {
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
String queueId, String userId, int numContainers) {
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers, int priority) {
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
priority);
}
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
String queueId, String userId, int numContainers, int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
priority, numContainers, true);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
@ -451,10 +473,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@ -569,11 +591,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
// Give queue B a minimum of 2048 M
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</queue>");
// Give queue C no minimum
@ -613,9 +635,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048),
assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@ -672,15 +694,15 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<queue name=\"queueC\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
@ -710,11 +732,11 @@ public class TestFairScheduler {
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<pool name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</pool>");
// Give queue B a minimum of 2048 M
out.println("<pool name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</pool>");
// Give queue C no minimum
@ -754,9 +776,9 @@ public class TestFairScheduler {
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
assertEquals(Resources.createResource(1024, 0),
queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048),
assertEquals(Resources.createResource(2048, 0),
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("root.queueC"));
@ -812,10 +834,10 @@ public class TestFairScheduler {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
@ -825,7 +847,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -885,7 +907,7 @@ public class TestFairScheduler {
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -963,19 +985,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@ -1106,19 +1128,19 @@ public class TestFairScheduler {
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
@ -1130,19 +1152,19 @@ public class TestFairScheduler {
// Create four nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
@ -1206,19 +1228,19 @@ public class TestFairScheduler {
// After minSharePreemptionTime has passed, they should want to preempt min
// share.
clock.tick(6);
assertTrue(Resources.equals(
Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
assertEquals(
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
// After fairSharePreemptionTime has passed, they should want to preempt
// fair share.
scheduler.update();
clock.tick(6);
assertTrue(Resources.equals(
Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
assertEquals(
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
@Test (timeout = 5000)
@ -1271,7 +1293,7 @@ public class TestFairScheduler {
// Add a node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -1443,7 +1465,7 @@ public class TestFairScheduler {
public void testFifoWithinQueue() throws Exception {
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
.newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -1488,7 +1510,7 @@ public class TestFairScheduler {
.setPolicy(SchedulingPolicy.getDefault());
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
@ -1536,10 +1558,10 @@ public class TestFairScheduler {
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
RMNode node2 =
MockNodes
.newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
.newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
@ -1685,7 +1707,8 @@ public class TestFairScheduler {
public void testRemoveNodeUpdatesRootQueueMetrics() {
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
"127.0.0.1");
NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
scheduler.handle(addEvent);
@ -1824,4 +1847,157 @@ public class TestFairScheduler {
scheduler.handle(nodeUpdateEvent);
assertEquals(0, app.getReservedContainers().size());
}
public void testNoMoreCpuOnNode() {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
"user1", 2);
FSSchedulerApp app = scheduler.applications.get(attId);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size());
scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size());
}
public void testBasicDRFAssignment() throws Exception {
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
scheduler.handle(nodeEvent);
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
// First both apps get a container
// Then the first gets another container because its dominant share of
// 2048/8192 is less than the other's of 2/5
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(updateEvent);
Assert.assertEquals(1, app1.getLiveContainers().size());
Assert.assertEquals(0, app2.getLiveContainers().size());
scheduler.handle(updateEvent);
Assert.assertEquals(1, app1.getLiveContainers().size());
Assert.assertEquals(1, app2.getLiveContainers().size());
scheduler.handle(updateEvent);
Assert.assertEquals(2, app1.getLiveContainers().size());
Assert.assertEquals(1, app2.getLiveContainers().size());
}
/**
* Two apps on one queue, one app on another
*/
@Test
public void testBasicDRFWithQueues() throws Exception {
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
scheduler.handle(nodeEvent);
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(updateEvent);
Assert.assertEquals(1, app1.getLiveContainers().size());
scheduler.handle(updateEvent);
Assert.assertEquals(1, app3.getLiveContainers().size());
scheduler.handle(updateEvent);
Assert.assertEquals(2, app3.getLiveContainers().size());
scheduler.handle(updateEvent);
Assert.assertEquals(1, app2.getLiveContainers().size());
}
@Test
public void testDRFHierarchicalQueues() throws Exception {
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
scheduler.handle(nodeEvent);
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterCapacity());
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(updateEvent);
// app1 gets first container because it asked first
Assert.assertEquals(1, app1.getLiveContainers().size());
scheduler.handle(updateEvent);
// app4 gets second container because it's on queue2
Assert.assertEquals(1, app4.getLiveContainers().size());
scheduler.handle(updateEvent);
// app4 gets another container because queue2's dominant share of memory
// is still less than queue1's of cpu
Assert.assertEquals(2, app4.getLiveContainers().size());
scheduler.handle(updateEvent);
// app3 gets one because queue1 gets one and queue1.subqueue2 is behind
// queue1.subqueue1
Assert.assertEquals(1, app3.getLiveContainers().size());
scheduler.handle(updateEvent);
// app4 would get another one, but it doesn't have any requests
// queue1.subqueue2 is still using less than queue1.subqueue1, so it
// gets another
Assert.assertEquals(2, app3.getLiveContainers().size());
// queue1.subqueue1 is behind again, so it gets one, which it gives to app2
scheduler.handle(updateEvent);
Assert.assertEquals(1, app2.getLiveContainers().size());
// at this point, we've used all our CPU up, so nobody else should get a container
scheduler.handle(updateEvent);
Assert.assertEquals(1, app1.getLiveContainers().size());
Assert.assertEquals(1, app2.getLiveContainers().size());
Assert.assertEquals(2, app3.getLiveContainers().size());
Assert.assertEquals(2, app4.getLiveContainers().size());
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.*;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
public class TestFairSchedulerConfiguration {
@Test
public void testParseResourceConfigValue() throws Exception {
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("2 vcores, 1024 mb"));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024 mb, 2 vcores"));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("2vcores,1024mb"));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024mb,2vcores"));
}
@Test(expected = AllocationConfigurationException.class)
public void testNoUnits() throws Exception {
parseResourceConfigValue("1024");
}
@Test(expected = AllocationConfigurationException.class)
public void testOnlyMemory() throws Exception {
parseResourceConfigValue("1024mb");
}
@Test(expected = AllocationConfigurationException.class)
public void testOnlyCPU() throws Exception {
parseResourceConfigValue("1024vcores");
}
@Test(expected = AllocationConfigurationException.class)
public void testGibberish() throws Exception {
parseResourceConfigValue("1o24vc0res");
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.junit.Test;
@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
assertTrue("Invalid scheduler name",
sm.getName().equals(FairSharePolicy.NAME));
// Shortname - drf
sm = SchedulingPolicy.parse("drf");
assertTrue("Invalid scheduler name",
sm.getName().equals(DominantResourceFairnessPolicy.NAME));
// Shortname - fair
sm = SchedulingPolicy.parse("fair");
assertTrue("Invalid scheduler name",
@ -94,6 +100,19 @@ public class TestSchedulingPolicy {
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
// drf
policy = SchedulingPolicy.parse("drf");
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
SchedulingPolicy.DEPTH_INTERMEDIATE));
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
assertTrue(ERR,
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
policy = Mockito.mock(SchedulingPolicy.class);
Mockito.when(policy.getApplicableDepth()).thenReturn(
SchedulingPolicy.DEPTH_PARENT);

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import static org.junit.Assert.*;
import java.util.Comparator;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
/**
* comparator.compare(sched1, sched2) < 0 means that sched1 should get a
* container before sched2
*/
public class TestDominantResourceFairnessPolicy {
private Comparator<Schedulable> createComparator(int clusterMem,
int clusterCpu) {
DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu));
return policy.getComparator();
}
private Schedulable createSchedulable(int memUsage, int cpuUsage) {
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL, 0, 0);
}
private Schedulable createSchedulable(int memUsage, int cpuUsage,
int minMemShare, int minCpuShare) {
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL,
minMemShare, minCpuShare);
}
private Schedulable createSchedulable(int memUsage, int cpuUsage,
ResourceWeights weights) {
return createSchedulable(memUsage, cpuUsage, weights, 0, 0);
}
private Schedulable createSchedulable(int memUsage, int cpuUsage,
ResourceWeights weights, int minMemShare, int minCpuShare) {
Resource usage = BuilderUtils.newResource(memUsage, cpuUsage);
Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare);
return new FakeSchedulable(Resources.none(), minShare, weights,
Resources.none(), usage, 0l);
}
@Test
public void testSameDominantResource() {
assertTrue(createComparator(8000, 4).compare(
createSchedulable(1000, 1),
createSchedulable(2000, 1)) < 0);
}
@Test
public void testDifferentDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(4000, 3),
createSchedulable(2000, 5)) < 0);
}
@Test
public void testOneIsNeedy() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(2000, 5, 0, 6),
createSchedulable(4000, 3, 0, 0)) < 0);
}
@Test
public void testBothAreNeedy() {
assertTrue(createComparator(8000, 100).compare(
// dominant share is 2000/8000
createSchedulable(2000, 5),
// dominant share is 4000/8000
createSchedulable(4000, 3)) < 0);
assertTrue(createComparator(8000, 100).compare(
// dominant min share is 2/3
createSchedulable(2000, 5, 3000, 6),
// dominant min share is 4/5
createSchedulable(4000, 3, 5000, 4)) < 0);
}
@Test
public void testEvenWeightsSameDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
createSchedulable(2000, 1)) < 0);
assertTrue(createComparator(8000, 8).compare(
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
createSchedulable(1000, 2)) < 0);
}
@Test
public void testEvenWeightsDifferentDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
createSchedulable(2000, 1)) < 0);
assertTrue(createComparator(8000, 8).compare(
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
createSchedulable(1000, 2)) < 0);
}
@Test
public void testUnevenWeightsSameDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
createSchedulable(2000, 1)) < 0);
assertTrue(createComparator(8000, 8).compare(
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
createSchedulable(1000, 2)) < 0);
}
@Test
public void testUnevenWeightsDifferentDominantResource() {
assertTrue(createComparator(8000, 8).compare(
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
createSchedulable(2000, 1)) < 0);
assertTrue(createComparator(8000, 8).compare(
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
createSchedulable(1000, 2)) < 0);
}
@Test
public void testCalculateShares() {
Resource used = Resources.createResource(10, 5);
Resource capacity = Resources.createResource(100, 10);
ResourceType[] resourceOrder = new ResourceType[2];
ResourceWeights shares = new ResourceWeights();
DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator =
new DominantResourceFairnessPolicy.DominantResourceFairnessComparator();
comparator.calculateShares(used, capacity, shares, resourceOrder,
ResourceWeights.NEUTRAL);
assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001);
assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001);
assertEquals(ResourceType.CPU, resourceOrder[0]);
assertEquals(ResourceType.MEMORY, resourceOrder[1]);
}
}

View File

@ -32,16 +32,17 @@ Hadoop MapReduce Next Generation - Fair Scheduler
Fair scheduling is a method of assigning resources to applications such that
all apps get, on average, an equal share of resources over time.
Hadoop NextGen is capable of scheduling multiple resource types, such as
Memory and CPU. Currently only memory is supported, so a "cluster share" is
a proportion of aggregate memory in the cluster. When there is a single app
running, that app uses the entire cluster. When other apps are submitted,
resources that free up are assigned to the new apps, so that each app gets
roughly the same amount of resources. Unlike the default Hadoop scheduler,
which forms a queue of apps, this lets short apps finish in reasonable time
while not starving long-lived apps. It is also a reasonable way to share a
cluster between a number of users. Finally, fair sharing can also work with
app priorities - the priorities are used as weights to determine the
Hadoop NextGen is capable of scheduling multiple resource types. By default,
the Fair Scheduler bases scheduling fairness decisions only on memory. It
can be configured to schedule with both memory and CPU, using the notion
of Dominant Resource Fairness developed by Ghodsi et al. When there is a
single app running, that app uses the entire cluster. When other apps are
submitted, resources that free up are assigned to the new apps, so that each
app eventually on gets roughly the same amount of resources. Unlike the default
Hadoop scheduler, which forms a queue of apps, this lets short apps finish in
reasonable time while not starving long-lived apps. It is also a reasonable way
to share a cluster between a number of users. Finally, fair sharing can also
work with app priorities - the priorities are used as weights to determine the
fraction of total resources that each app should get.
The scheduler organizes apps further into "queues", and shares resources
@ -49,9 +50,10 @@ Hadoop MapReduce Next Generation - Fair Scheduler
called “default”. If an app specifically lists a queue in a container
resource request, the request is submitted to that queue. It is also
possible to assign queues based on the user name included with the request
through configuration. Within each queue, fair sharing is used to share
capacity between the running apps. queues can also be given weights to share
the cluster non-proportionally in the config file.
through configuration. Within each queue, a scheduling policy is used to share
resources between the running apps. The default is memory-based fair sharing,
but FIFO and multi-resource with Dominant Resource Fairness can also be
configured. Queues can be configured with weights to share the cluster non-evenly.
The fair scheduler supports hierarchical queues. All queues descend from a
queue named "root". Available resources are distributed among the children
@ -120,14 +122,6 @@ Hadoop MapReduce Next Generation - Fair Scheduler
queues and their properties, in addition to certain policy defaults. This file
must be in XML format as described in the next section.
* <<<yarn.scheduler.fair.minimum-allocation-mb>>>
* The smallest container size the scheduler can allocate, in MB of memory.
* <<<yarn.scheduler.fair.maximum-allocation-mb>>>
* The largest container the scheduler can allocate, in MB of memory.
* <<<yarn.scheduler.fair.user-as-default-queue>>>
* Whether to use the username associated with the allocation as the default
@ -183,17 +177,23 @@ Allocation file format
* <<Queue elements>>, which represent queues. Each may contain the following
properties:
* minResources: minimum MB of aggregate memory the queue expects. If a queue
demands resources, and its current allocation is below its configured minimum,
it will be assigned available resources before any queue that is not in this
situation. If multiple queues are in this situation, resources go to the
queue with the smallest ratio between allocation and minimum. Note that it is
possible that a queue that is below its minimum may not immediately get up to
its minimum when it submits an application, because already-running jobs may
be using those resources.
* minResources: minimum resources the queue is entitled to, in the form
"X mb, Y vcores". If a queue's minimum share is not satisfied, it will be
offered available resources before any other queue under the same parent.
Under the single-resource fairness policy, a queue
is considered unsatisfied if its memory usage is below its minimum memory
share. Under dominant resource fairness, a queue is considered unsatisfied
if its usage for its dominant resource with respect to the cluster capacity
is below its minimum share for that resource. If multiple queues are
unsatisfied in this situation, resources go to the queue with the smallest
ratio between relevant resource usage and minimum. Note that it is
possible that a queue that is below its minimum may not immediately get up
to its minimum when it submits an application, because already-running jobs
may be using those resources.
* maxResources: maximum MB of aggregate memory a queue is allowed. A queue
will never be assigned a container that would put it over this limit.
* maxResources: maximum resources a queue is allowed, in the form
"X mb, Y vcores". A queue will never be assigned a container that would
put its aggregate usage over this limit.
* maxRunningApps: limit the number of apps from the queue to run at once
@ -232,13 +232,13 @@ Allocation file format
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>10000</minResources>
<maxResources>90000</maxResources>
<minResources>10000 mb</minResources>
<maxResources>90000 mb</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingMode>fair</schedulingMode>
<queue name="sample_sub_queue">
<minResources>5000</minResources>
<minResources>5000 mb</minResources>
</queue>
</queue>
<user name="sample_user">