YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1489072 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f3c4f2b3e2
commit
7854b322d4
|
@ -88,6 +88,9 @@ Release 2.1.0-beta - UNRELEASED
|
|||
YARN-392. Make it possible to specify hard locality constraints in resource
|
||||
requests. (sandyr via tucu)
|
||||
|
||||
YARN-326. Add multi-resource scheduling to the fair scheduler.
|
||||
(sandyr via tucu)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-365. Change NM heartbeat handling to not generate a scheduler event
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public enum ResourceType {
|
||||
MEMORY, CPU
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public class ResourceWeights {
|
||||
public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
|
||||
|
||||
private float[] weights = new float[ResourceType.values().length];
|
||||
|
||||
public ResourceWeights(float memoryWeight, float cpuWeight) {
|
||||
weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
|
||||
weights[ResourceType.CPU.ordinal()] = cpuWeight;
|
||||
}
|
||||
|
||||
public ResourceWeights(float weight) {
|
||||
for (int i = 0; i < weights.length; i++) {
|
||||
weights[i] = weight;
|
||||
}
|
||||
}
|
||||
|
||||
public ResourceWeights() { }
|
||||
|
||||
public void setWeight(ResourceType resourceType, float weight) {
|
||||
weights[resourceType.ordinal()] = weight;
|
||||
}
|
||||
|
||||
public float getWeight(ResourceType resourceType) {
|
||||
return weights[resourceType.ordinal()];
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<");
|
||||
for (int i = 0; i < ResourceType.values().length; i++) {
|
||||
if (i != 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
ResourceType resourceType = ResourceType.values()[i];
|
||||
sb.append(resourceType.name().toLowerCase());
|
||||
sb.append(String.format(" weight=%.1f", getWeight(resourceType)));
|
||||
}
|
||||
sb.append(">");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -32,4 +32,8 @@ public class AllocationConfigurationException extends Exception {
|
|||
public AllocationConfigurationException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public AllocationConfigurationException(String message, Throwable t) {
|
||||
super(message, t);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
|
@ -119,7 +120,7 @@ public class AppSchedulable extends Schedulable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public double getWeight() {
|
||||
public ResourceWeights getWeights() {
|
||||
return scheduler.getAppWeight(this);
|
||||
}
|
||||
|
||||
|
@ -237,10 +238,7 @@ public class AppSchedulable extends Schedulable {
|
|||
}
|
||||
|
||||
// Can we allocate a container on this node?
|
||||
int availableContainers =
|
||||
available.getMemory() / capability.getMemory();
|
||||
|
||||
if (availableContainers > 0) {
|
||||
if (Resources.fitsIn(capability, available)) {
|
||||
// Inform the application of the new container for this request
|
||||
RMContainer allocatedContainer =
|
||||
app.allocate(type, node, priority, request, container);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
|
||||
|
@ -84,7 +85,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
|||
throws AllocationConfigurationException;
|
||||
|
||||
@Override
|
||||
public double getWeight() {
|
||||
public ResourceWeights getWeights() {
|
||||
return queueMgr.getQueueWeight(getName());
|
||||
}
|
||||
|
||||
|
|
|
@ -180,8 +180,8 @@ public class FSSchedulerNode extends SchedulerNode {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +
|
||||
" available=" + getAvailableResource().getMemory() +
|
||||
" used=" + getUsedResource().getMemory();
|
||||
" available=" + getAvailableResource() +
|
||||
" used=" + getUsedResource();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -495,14 +497,14 @@ public class FairScheduler implements ResourceScheduler {
|
|||
}
|
||||
|
||||
// synchronized for sizeBasedWeight
|
||||
public synchronized double getAppWeight(AppSchedulable app) {
|
||||
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
|
||||
if (!app.getRunnable()) {
|
||||
// Job won't launch tasks, but don't return 0 to avoid division errors
|
||||
return 1.0;
|
||||
return ResourceWeights.NEUTRAL;
|
||||
} else {
|
||||
double weight = 1.0;
|
||||
if (sizeBasedWeight) {
|
||||
// Set weight based on current demand
|
||||
// Set weight based on current memory demand
|
||||
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
|
||||
}
|
||||
weight *= app.getPriority().getPriority();
|
||||
|
@ -510,7 +512,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
// Run weight through the user-supplied weightAdjuster
|
||||
weight = weightAdjuster.adjustWeight(app, weight);
|
||||
}
|
||||
return weight;
|
||||
return new ResourceWeights((float)weight);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -714,37 +716,6 @@ public class FairScheduler implements ResourceScheduler {
|
|||
" cluster capacity: " + clusterCapacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a list of resource requests, by ensuring that
|
||||
* the memory for each request is a multiple of minMemory and is not zero.
|
||||
*
|
||||
* @param asks a list of resource requests
|
||||
* @param minMemory the configured minimum memory allocation
|
||||
* @param maxMemory the configured maximum memory allocation
|
||||
*/
|
||||
static void normalizeRequests(List<ResourceRequest> asks,
|
||||
int minMemory, int maxMemory) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(ask, minMemory, maxMemory);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a resource request, by ensuring that the
|
||||
* requested memory is a multiple of minMemory and is not zero.
|
||||
*
|
||||
* @param ask the resource request
|
||||
* @param minMemory the configured minimum memory allocation
|
||||
* @param maxMemory the configured maximum memory allocation
|
||||
*/
|
||||
static void normalizeRequest(ResourceRequest ask, int minMemory,
|
||||
int maxMemory) {
|
||||
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
|
||||
int normalizedMemory =
|
||||
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0));
|
||||
ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Allocation allocate(ApplicationAttemptId appAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release) {
|
||||
|
@ -758,8 +729,8 @@ public class FairScheduler implements ResourceScheduler {
|
|||
}
|
||||
|
||||
// Sanity check
|
||||
normalizeRequests(ask, minimumAllocation.getMemory(),
|
||||
maximumAllocation.getMemory());
|
||||
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
|
||||
clusterCapacity, minimumAllocation, maximumAllocation);
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainerId : release) {
|
||||
|
@ -1015,8 +986,8 @@ public class FairScheduler implements ResourceScheduler {
|
|||
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
||||
throws IOException {
|
||||
this.conf = new FairSchedulerConfiguration(conf);
|
||||
minimumAllocation = this.conf.getMinimumMemoryAllocation();
|
||||
maximumAllocation = this.conf.getMaximumMemoryAllocation();
|
||||
minimumAllocation = this.conf.getMinimumAllocation();
|
||||
maximumAllocation = this.conf.getMaximumAllocation();
|
||||
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
|
||||
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
||||
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
||||
|
|
|
@ -18,12 +18,16 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
|
@ -78,18 +82,24 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
addResource(FS_CONFIGURATION_FILE);
|
||||
}
|
||||
|
||||
public Resource getMinimumMemoryAllocation() {
|
||||
public Resource getMinimumAllocation() {
|
||||
int mem = getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||
return Resources.createResource(mem);
|
||||
int cpu = getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||
return Resources.createResource(mem, cpu);
|
||||
}
|
||||
|
||||
public Resource getMaximumMemoryAllocation() {
|
||||
public Resource getMaximumAllocation() {
|
||||
int mem = getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
return Resources.createResource(mem);
|
||||
int cpu = getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
return Resources.createResource(mem, cpu);
|
||||
}
|
||||
|
||||
public boolean getUserAsDefaultQueue() {
|
||||
|
@ -136,4 +146,34 @@ public class FairSchedulerConfiguration extends Configuration {
|
|||
public int getWaitTimeBeforeKill() {
|
||||
return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a resource config value of a form like "1024", "1024 mb",
|
||||
* or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.
|
||||
*
|
||||
* @throws AllocationConfigurationException
|
||||
*/
|
||||
public static Resource parseResourceConfigValue(String val)
|
||||
throws AllocationConfigurationException {
|
||||
try {
|
||||
int memory = findResource(val, "mb");
|
||||
int vcores = findResource(val, "vcores");
|
||||
return BuilderUtils.newResource(memory, vcores);
|
||||
} catch (AllocationConfigurationException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new AllocationConfigurationException(
|
||||
"Error reading resource config", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static int findResource(String val, String units)
|
||||
throws AllocationConfigurationException {
|
||||
Pattern pattern = Pattern.compile("(\\d+) ?" + units);
|
||||
Matcher matcher = pattern.matcher(val);
|
||||
if (!matcher.find()) {
|
||||
throw new AllocationConfigurationException("Missing resource: " + units);
|
||||
}
|
||||
return Integer.parseInt(matcher.group(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
|
@ -301,7 +302,7 @@ public class QueueManager {
|
|||
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
|
||||
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
||||
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
||||
Map<String, Double> queueWeights = new HashMap<String, Double>();
|
||||
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
|
||||
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
|
||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||
|
@ -415,7 +416,7 @@ public class QueueManager {
|
|||
*/
|
||||
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
|
||||
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
|
||||
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, SchedulingPolicy> queuePolicies,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
||||
|
@ -433,12 +434,12 @@ public class QueueManager {
|
|||
Element field = (Element) fieldNode;
|
||||
if ("minResources".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
int val = Integer.parseInt(text);
|
||||
minQueueResources.put(queueName, Resources.createResource(val));
|
||||
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
minQueueResources.put(queueName, val);
|
||||
} else if ("maxResources".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
int val = Integer.parseInt(text);
|
||||
maxQueueResources.put(queueName, Resources.createResource(val));
|
||||
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
maxQueueResources.put(queueName, val);
|
||||
} else if ("maxRunningApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
int val = Integer.parseInt(text);
|
||||
|
@ -446,7 +447,7 @@ public class QueueManager {
|
|||
} else if ("weight".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
double val = Double.parseDouble(text);
|
||||
queueWeights.put(queueName, val);
|
||||
queueWeights.put(queueName, new ResourceWeights((float)val));
|
||||
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
long val = Long.parseLong(text) * 1000L;
|
||||
|
@ -454,7 +455,9 @@ public class QueueManager {
|
|||
} else if ("schedulingPolicy".equals(field.getTagName())
|
||||
|| "schedulingMode".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
queuePolicies.put(queueName, SchedulingPolicy.parse(text));
|
||||
SchedulingPolicy policy = SchedulingPolicy.parse(text);
|
||||
policy.initialize(scheduler.getClusterCapacity());
|
||||
queuePolicies.put(queueName, policy);
|
||||
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
||||
|
@ -510,13 +513,20 @@ public class QueueManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get a collection of all queues
|
||||
* Get a collection of all leaf queues
|
||||
*/
|
||||
public Collection<FSLeafQueue> getLeafQueues() {
|
||||
synchronized (queues) {
|
||||
return leafQueues;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a collection of all queues
|
||||
*/
|
||||
public Collection<FSQueue> getQueues() {
|
||||
return queues.values();
|
||||
}
|
||||
|
||||
public int getUserMaxApps(String user) {
|
||||
// save current info in case it gets changed under us
|
||||
|
@ -538,12 +548,12 @@ public class QueueManager {
|
|||
}
|
||||
}
|
||||
|
||||
public double getQueueWeight(String queue) {
|
||||
Double weight = info.queueWeights.get(queue);
|
||||
public ResourceWeights getQueueWeight(String queue) {
|
||||
ResourceWeights weight = info.queueWeights.get(queue);
|
||||
if (weight != null) {
|
||||
return weight;
|
||||
} else {
|
||||
return 1.0;
|
||||
return ResourceWeights.NEUTRAL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -595,7 +605,7 @@ public class QueueManager {
|
|||
// Maximum amount of resources per queue
|
||||
public final Map<String, Resource> maxQueueResources;
|
||||
// Sharing weights for each queue
|
||||
public final Map<String, Double> queueWeights;
|
||||
public final Map<String, ResourceWeights> queueWeights;
|
||||
|
||||
// Max concurrent running applications for each queue and for each user; in addition,
|
||||
// for users that have no max specified, we use the userMaxJobsDefault.
|
||||
|
@ -625,7 +635,7 @@ public class QueueManager {
|
|||
public QueueManagerInfo(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||
Map<String, Double> queueWeights, int userMaxAppsDefault,
|
||||
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
|
||||
int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
|
@ -647,7 +657,7 @@ public class QueueManager {
|
|||
public QueueManagerInfo() {
|
||||
minQueueResources = new HashMap<String, Resource>();
|
||||
maxQueueResources = new HashMap<String, Resource>();
|
||||
queueWeights = new HashMap<String, Double>();
|
||||
queueWeights = new HashMap<String, ResourceWeights>();
|
||||
queueMaxApps = new HashMap<String, Integer>();
|
||||
userMaxApps = new HashMap<String, Integer>();
|
||||
userMaxAppsDefault = Integer.MAX_VALUE;
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -80,7 +81,7 @@ public abstract class Schedulable {
|
|||
|
||||
|
||||
/** Job/queue weight in fair sharing. */
|
||||
public abstract double getWeight();
|
||||
public abstract ResourceWeights getWeights();
|
||||
|
||||
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
|
||||
public abstract long getStartTime();
|
||||
|
@ -110,7 +111,7 @@ public abstract class Schedulable {
|
|||
/** Convenient toString implementation for debugging. */
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
|
||||
getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
|
||||
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
|
||||
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
|
||||
|
@ -67,11 +68,12 @@ public abstract class SchedulingPolicy {
|
|||
/**
|
||||
* Returns {@link SchedulingPolicy} instance corresponding to the
|
||||
* {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
|
||||
* FairsharePolicy or "fifo" for FifoPolicy. For custom
|
||||
* FairSharePolicy, "fifo" for FifoPolicy, or "drf" for
|
||||
* DominantResourceFairnessPolicy. For a custom
|
||||
* {@link SchedulingPolicy}s in the RM classpath, the policy should be
|
||||
* canonical class name of the {@link SchedulingPolicy}.
|
||||
*
|
||||
* @param policy canonical class name or "fair" or "fifo"
|
||||
* @param policy canonical class name or "drf" or "fair" or "fifo"
|
||||
* @throws AllocationConfigurationException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -80,10 +82,12 @@ public abstract class SchedulingPolicy {
|
|||
@SuppressWarnings("rawtypes")
|
||||
Class clazz;
|
||||
String text = policy.toLowerCase();
|
||||
if (text.equals("fair")) {
|
||||
if (text.equalsIgnoreCase(FairSharePolicy.NAME)) {
|
||||
clazz = FairSharePolicy.class;
|
||||
} else if (text.equals("fifo")) {
|
||||
} else if (text.equalsIgnoreCase(FifoPolicy.NAME)) {
|
||||
clazz = FifoPolicy.class;
|
||||
} else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) {
|
||||
clazz = DominantResourceFairnessPolicy.class;
|
||||
} else {
|
||||
try {
|
||||
clazz = Class.forName(policy);
|
||||
|
@ -98,6 +102,8 @@ public abstract class SchedulingPolicy {
|
|||
}
|
||||
return getInstance(clazz);
|
||||
}
|
||||
|
||||
public void initialize(Resource clusterCapacity) {}
|
||||
|
||||
/**
|
||||
* @return returns the name of {@link SchedulingPolicy}
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
|
||||
|
||||
/**
|
||||
* Makes scheduling decisions by trying to equalize dominant resource usage.
|
||||
* A schedulable's dominant resource usage is the largest ratio of resource
|
||||
* usage to capacity among the resource types it is using.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
||||
|
||||
public static final String NAME = "DRF";
|
||||
|
||||
private DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessComparator();
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getApplicableDepth() {
|
||||
return SchedulingPolicy.DEPTH_ANY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
Resource totalResources) {
|
||||
|
||||
// TODO: For now, set all fair shares to 0, because, in the context of DRF,
|
||||
// it doesn't make sense to set a value for each resource. YARN-736 should
|
||||
// add in a sensible replacement.
|
||||
|
||||
for (Schedulable schedulable : schedulables) {
|
||||
schedulable.setFairShare(Resources.none());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(Resource clusterCapacity) {
|
||||
comparator.setClusterCapacity(clusterCapacity);
|
||||
}
|
||||
|
||||
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
|
||||
private static final int NUM_RESOURCES = ResourceType.values().length;
|
||||
|
||||
private Resource clusterCapacity;
|
||||
|
||||
public void setClusterCapacity(Resource clusterCapacity) {
|
||||
this.clusterCapacity = clusterCapacity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(Schedulable s1, Schedulable s2) {
|
||||
ResourceWeights sharesOfCluster1 = new ResourceWeights();
|
||||
ResourceWeights sharesOfCluster2 = new ResourceWeights();
|
||||
ResourceWeights sharesOfMinShare1 = new ResourceWeights();
|
||||
ResourceWeights sharesOfMinShare2 = new ResourceWeights();
|
||||
ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
|
||||
ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
|
||||
|
||||
// Calculate shares of the cluster for each resource both schedulables.
|
||||
calculateShares(s1.getResourceUsage(),
|
||||
clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
|
||||
calculateShares(s1.getResourceUsage(),
|
||||
s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
|
||||
calculateShares(s2.getResourceUsage(),
|
||||
clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
|
||||
calculateShares(s2.getResourceUsage(),
|
||||
s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
|
||||
|
||||
// A queue is needy for its min share if its dominant resource
|
||||
// (with respect to the cluster capacity) is below its configured min share
|
||||
// for that resource
|
||||
boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
|
||||
boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
|
||||
|
||||
int res = 0;
|
||||
if (!s2Needy && !s1Needy) {
|
||||
res = compareShares(sharesOfCluster1, sharesOfCluster2,
|
||||
resourceOrder1, resourceOrder2);
|
||||
} else if (s1Needy && !s2Needy) {
|
||||
res = -1;
|
||||
} else if (s2Needy && !s1Needy) {
|
||||
res = 1;
|
||||
} else { // both are needy below min share
|
||||
res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
|
||||
resourceOrder1, resourceOrder2);
|
||||
}
|
||||
if (res == 0) {
|
||||
// Apps are tied in fairness ratio. Break the tie by submit time.
|
||||
res = (int)(s1.getStartTime() - s2.getStartTime());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates and orders a resource's share of a pool in terms of two vectors.
|
||||
* The shares vector contains, for each resource, the fraction of the pool that
|
||||
* it takes up. The resourceOrder vector contains an ordering of resources
|
||||
* by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
|
||||
* shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
|
||||
*/
|
||||
void calculateShares(Resource resource, Resource pool,
|
||||
ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
|
||||
shares.setWeight(MEMORY, (float)resource.getMemory() /
|
||||
(pool.getMemory() * weights.getWeight(MEMORY)));
|
||||
shares.setWeight(CPU, (float)resource.getVirtualCores() /
|
||||
(pool.getVirtualCores() * weights.getWeight(CPU)));
|
||||
// sort order vector by resource share
|
||||
if (resourceOrder != null) {
|
||||
if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
|
||||
resourceOrder[0] = MEMORY;
|
||||
resourceOrder[1] = CPU;
|
||||
} else {
|
||||
resourceOrder[0] = CPU;
|
||||
resourceOrder[1] = MEMORY;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
|
||||
ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
|
||||
for (int i = 0; i < resourceOrder1.length; i++) {
|
||||
int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
|
||||
- shares2.getWeight(resourceOrder2[i]));
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,17 +25,21 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Makes scheduling decisions by trying to equalize shares of memory.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class FairSharePolicy extends SchedulingPolicy {
|
||||
@VisibleForTesting
|
||||
public static final String NAME = "Fairshare";
|
||||
public static final String NAME = "fair";
|
||||
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
||||
new DefaultResourceCalculator();
|
||||
private FairShareComparator comparator = new FairShareComparator();
|
||||
|
@ -79,8 +83,10 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
|
||||
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
|
||||
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
|
||||
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
|
||||
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
|
||||
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
|
||||
s1.getWeights().getWeight(ResourceType.MEMORY);
|
||||
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
|
||||
s2.getWeights().getWeight(ResourceType.MEMORY);
|
||||
int res = 0;
|
||||
if (s1Needy && !s2Needy)
|
||||
res = -1;
|
||||
|
@ -220,7 +226,7 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
||||
*/
|
||||
private static Resource computeShare(Schedulable sched, double r2sRatio) {
|
||||
double share = sched.getWeight() * r2sRatio;
|
||||
double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
|
||||
share = Math.max(share, sched.getMinShare().getMemory());
|
||||
share = Math.min(share, sched.getDemand().getMemory());
|
||||
return Resources.createResource((int) share);
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
|
@ -30,7 +31,7 @@ public class FakeSchedulable extends Schedulable {
|
|||
private Resource demand;
|
||||
private Resource usage;
|
||||
private Resource minShare;
|
||||
private double weight;
|
||||
private ResourceWeights weights;
|
||||
private Priority priority;
|
||||
private long startTime;
|
||||
|
||||
|
@ -46,21 +47,22 @@ public class FakeSchedulable extends Schedulable {
|
|||
this(demand, minShare, 1, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(int demand, int minShare, double weight) {
|
||||
this(demand, minShare, weight, 0, 0, 0);
|
||||
public FakeSchedulable(int demand, int minShare, double memoryWeight) {
|
||||
this(demand, minShare, memoryWeight, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
|
||||
long startTime) {
|
||||
this(Resources.createResource(demand), Resources.createResource(minShare), weight,
|
||||
Resources.createResource(fairShare), Resources.createResource(usage), startTime);
|
||||
this(Resources.createResource(demand), Resources.createResource(minShare),
|
||||
new ResourceWeights((float)weight), Resources.createResource(fairShare),
|
||||
Resources.createResource(usage), startTime);
|
||||
}
|
||||
|
||||
public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare,
|
||||
Resource usage, long startTime) {
|
||||
public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
|
||||
Resource fairShare, Resource usage, long startTime) {
|
||||
this.demand = demand;
|
||||
this.minShare = minShare;
|
||||
this.weight = weight;
|
||||
this.weights = weight;
|
||||
setFairShare(fairShare);
|
||||
this.usage = usage;
|
||||
this.priority = Records.newRecord(Priority.class);
|
||||
|
@ -98,8 +100,8 @@ public class FakeSchedulable extends Schedulable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public double getWeight() {
|
||||
return weight;
|
||||
public ResourceWeights getWeights() {
|
||||
return weights;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
|
@ -147,12 +148,17 @@ public class TestFairScheduler {
|
|||
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
|
||||
return attId;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private ResourceRequest createResourceRequest(int memory, String host,
|
||||
int priority, int numContainers, boolean relaxLocality) {
|
||||
return createResourceRequest(memory, 1, host, priority, numContainers,
|
||||
relaxLocality);
|
||||
}
|
||||
|
||||
private ResourceRequest createResourceRequest(int memory, int vcores, String host,
|
||||
int priority, int numContainers, boolean relaxLocality) {
|
||||
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
|
||||
request.setCapability(Resources.createResource(memory));
|
||||
request.setCapability(BuilderUtils.newResource(memory, vcores));
|
||||
request.setResourceName(host);
|
||||
request.setNumContainers(numContainers);
|
||||
Priority prio = recordFactory.newRecordInstance(Priority.class);
|
||||
|
@ -170,18 +176,34 @@ public class TestFairScheduler {
|
|||
String userId) {
|
||||
return createSchedulingRequest(memory, queueId, userId, 1);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
||||
String queueId, String userId) {
|
||||
return createSchedulingRequest(memory, vcores, queueId, userId, 1);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
|
||||
String userId, int numContainers) {
|
||||
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
||||
String queueId, String userId, int numContainers) {
|
||||
return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
|
||||
String userId, int numContainers, int priority) {
|
||||
return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
|
||||
priority);
|
||||
}
|
||||
|
||||
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
||||
String queueId, String userId, int numContainers, int priority) {
|
||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(id, queueId, userId);
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
|
||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
priority, numContainers, true);
|
||||
ask.add(request);
|
||||
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
|
||||
|
@ -451,10 +473,10 @@ public class TestFairScheduler {
|
|||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
@ -569,11 +591,11 @@ public class TestFairScheduler {
|
|||
out.println("<allocations>");
|
||||
// Give queue A a minimum of 1024 M
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
// Give queue B a minimum of 2048 M
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
||||
out.println("</queue>");
|
||||
// Give queue C no minimum
|
||||
|
@ -613,9 +635,9 @@ public class TestFairScheduler {
|
|||
assertEquals(Resources.createResource(0),
|
||||
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
|
||||
assertEquals(Resources.createResource(1024),
|
||||
assertEquals(Resources.createResource(1024, 0),
|
||||
queueManager.getMinResources("root.queueA"));
|
||||
assertEquals(Resources.createResource(2048),
|
||||
assertEquals(Resources.createResource(2048, 0),
|
||||
queueManager.getMinResources("root.queueB"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueManager.getMinResources("root.queueC"));
|
||||
|
@ -672,15 +694,15 @@ public class TestFairScheduler {
|
|||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("<queue name=\"queueC\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueD\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
|
@ -710,11 +732,11 @@ public class TestFairScheduler {
|
|||
out.println("<allocations>");
|
||||
// Give queue A a minimum of 1024 M
|
||||
out.println("<pool name=\"queueA\">");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</pool>");
|
||||
// Give queue B a minimum of 2048 M
|
||||
out.println("<pool name=\"queueB\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
||||
out.println("</pool>");
|
||||
// Give queue C no minimum
|
||||
|
@ -754,9 +776,9 @@ public class TestFairScheduler {
|
|||
assertEquals(Resources.createResource(0),
|
||||
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
|
||||
assertEquals(Resources.createResource(1024),
|
||||
assertEquals(Resources.createResource(1024, 0),
|
||||
queueManager.getMinResources("root.queueA"));
|
||||
assertEquals(Resources.createResource(2048),
|
||||
assertEquals(Resources.createResource(2048, 0),
|
||||
queueManager.getMinResources("root.queueB"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueManager.getMinResources("root.queueC"));
|
||||
|
@ -812,10 +834,10 @@ public class TestFairScheduler {
|
|||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<minResources>2048</minResources>");
|
||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
@ -825,7 +847,7 @@ public class TestFairScheduler {
|
|||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
@ -885,7 +907,7 @@ public class TestFairScheduler {
|
|||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
@ -963,19 +985,19 @@ public class TestFairScheduler {
|
|||
|
||||
// Create four nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
RMNode node3 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
|
||||
"127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||
scheduler.handle(nodeEvent3);
|
||||
|
@ -1106,19 +1128,19 @@ public class TestFairScheduler {
|
|||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueC\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueD\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024</minResources>");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
||||
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
|
||||
|
@ -1130,19 +1152,19 @@ public class TestFairScheduler {
|
|||
|
||||
// Create four nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
RMNode node3 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
|
||||
"127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||
scheduler.handle(nodeEvent3);
|
||||
|
@ -1206,19 +1228,19 @@ public class TestFairScheduler {
|
|||
// After minSharePreemptionTime has passed, they should want to preempt min
|
||||
// share.
|
||||
clock.tick(6);
|
||||
assertTrue(Resources.equals(
|
||||
Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
||||
|
||||
// After fairSharePreemptionTime has passed, they should want to preempt
|
||||
// fair share.
|
||||
scheduler.update();
|
||||
clock.tick(6);
|
||||
assertTrue(Resources.equals(
|
||||
Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
|
||||
assertEquals(
|
||||
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
|
@ -1271,7 +1293,7 @@ public class TestFairScheduler {
|
|||
// Add a node
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
|
||||
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1443,7 +1465,7 @@ public class TestFairScheduler {
|
|||
public void testFifoWithinQueue() throws Exception {
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
|
||||
.newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
|
@ -1488,7 +1510,7 @@ public class TestFairScheduler {
|
|||
.setPolicy(SchedulingPolicy.getDefault());
|
||||
|
||||
RMNode node =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(16384), 0,
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
|
@ -1536,10 +1558,10 @@ public class TestFairScheduler {
|
|||
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1");
|
||||
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
|
||||
RMNode node2 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2");
|
||||
.newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2");
|
||||
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
|
@ -1685,7 +1707,8 @@ public class TestFairScheduler {
|
|||
public void testRemoveNodeUpdatesRootQueueMetrics() {
|
||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(addEvent);
|
||||
|
||||
|
@ -1824,4 +1847,157 @@ public class TestFairScheduler {
|
|||
scheduler.handle(nodeUpdateEvent);
|
||||
assertEquals(0, app.getReservedContainers().size());
|
||||
}
|
||||
|
||||
public void testNoMoreCpuOnNode() {
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1),
|
||||
1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app = scheduler.applications.get(attId);
|
||||
scheduler.update();
|
||||
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(updateEvent);
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
}
|
||||
|
||||
public void testBasicDRFAssignment() throws Exception {
|
||||
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5));
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
||||
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
||||
|
||||
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
||||
drfPolicy.initialize(scheduler.getClusterCapacity());
|
||||
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
|
||||
scheduler.update();
|
||||
|
||||
// First both apps get a container
|
||||
// Then the first gets another container because its dominant share of
|
||||
// 2048/8192 is less than the other's of 2/5
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app1.getLiveContainers().size());
|
||||
Assert.assertEquals(0, app2.getLiveContainers().size());
|
||||
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app1.getLiveContainers().size());
|
||||
Assert.assertEquals(1, app2.getLiveContainers().size());
|
||||
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(2, app1.getLiveContainers().size());
|
||||
Assert.assertEquals(1, app2.getLiveContainers().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Two apps on one queue, one app on another
|
||||
*/
|
||||
@Test
|
||||
public void testBasicDRFWithQueues() throws Exception {
|
||||
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7),
|
||||
1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
||||
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
||||
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
|
||||
"user1", 2);
|
||||
FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
|
||||
|
||||
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
||||
drfPolicy.initialize(scheduler.getClusterCapacity());
|
||||
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
|
||||
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
|
||||
scheduler.update();
|
||||
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app1.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app3.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(2, app3.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app2.getLiveContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDRFHierarchicalQueues() throws Exception {
|
||||
RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12),
|
||||
1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
|
||||
"user1", 2);
|
||||
Thread.sleep(3); // so that start times will be different
|
||||
FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
||||
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
|
||||
"user1", 2);
|
||||
Thread.sleep(3); // so that start times will be different
|
||||
FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
||||
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
|
||||
"user1", 2);
|
||||
Thread.sleep(3); // so that start times will be different
|
||||
FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
|
||||
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
|
||||
"user1", 2);
|
||||
Thread.sleep(3); // so that start times will be different
|
||||
FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
|
||||
|
||||
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
||||
drfPolicy.initialize(scheduler.getClusterCapacity());
|
||||
scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy);
|
||||
scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy);
|
||||
scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy);
|
||||
scheduler.update();
|
||||
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||
scheduler.handle(updateEvent);
|
||||
// app1 gets first container because it asked first
|
||||
Assert.assertEquals(1, app1.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
// app4 gets second container because it's on queue2
|
||||
Assert.assertEquals(1, app4.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
// app4 gets another container because queue2's dominant share of memory
|
||||
// is still less than queue1's of cpu
|
||||
Assert.assertEquals(2, app4.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
// app3 gets one because queue1 gets one and queue1.subqueue2 is behind
|
||||
// queue1.subqueue1
|
||||
Assert.assertEquals(1, app3.getLiveContainers().size());
|
||||
scheduler.handle(updateEvent);
|
||||
// app4 would get another one, but it doesn't have any requests
|
||||
// queue1.subqueue2 is still using less than queue1.subqueue1, so it
|
||||
// gets another
|
||||
Assert.assertEquals(2, app3.getLiveContainers().size());
|
||||
// queue1.subqueue1 is behind again, so it gets one, which it gives to app2
|
||||
scheduler.handle(updateEvent);
|
||||
Assert.assertEquals(1, app2.getLiveContainers().size());
|
||||
|
||||
// at this point, we've used all our CPU up, so nobody else should get a container
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
Assert.assertEquals(1, app1.getLiveContainers().size());
|
||||
Assert.assertEquals(1, app2.getLiveContainers().size());
|
||||
Assert.assertEquals(2, app3.getLiveContainers().size());
|
||||
Assert.assertEquals(2, app4.getLiveContainers().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestFairSchedulerConfiguration {
|
||||
@Test
|
||||
public void testParseResourceConfigValue() throws Exception {
|
||||
assertEquals(BuilderUtils.newResource(1024, 2),
|
||||
parseResourceConfigValue("2 vcores, 1024 mb"));
|
||||
assertEquals(BuilderUtils.newResource(1024, 2),
|
||||
parseResourceConfigValue("1024 mb, 2 vcores"));
|
||||
assertEquals(BuilderUtils.newResource(1024, 2),
|
||||
parseResourceConfigValue("2vcores,1024mb"));
|
||||
assertEquals(BuilderUtils.newResource(1024, 2),
|
||||
parseResourceConfigValue("1024mb,2vcores"));
|
||||
}
|
||||
|
||||
@Test(expected = AllocationConfigurationException.class)
|
||||
public void testNoUnits() throws Exception {
|
||||
parseResourceConfigValue("1024");
|
||||
}
|
||||
|
||||
@Test(expected = AllocationConfigurationException.class)
|
||||
public void testOnlyMemory() throws Exception {
|
||||
parseResourceConfigValue("1024mb");
|
||||
}
|
||||
|
||||
@Test(expected = AllocationConfigurationException.class)
|
||||
public void testOnlyCPU() throws Exception {
|
||||
parseResourceConfigValue("1024vcores");
|
||||
}
|
||||
|
||||
@Test(expected = AllocationConfigurationException.class)
|
||||
public void testGibberish() throws Exception {
|
||||
parseResourceConfigValue("1o24vc0res");
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.junit.Test;
|
||||
|
@ -49,6 +50,11 @@ public class TestSchedulingPolicy {
|
|||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FairSharePolicy.NAME));
|
||||
|
||||
// Shortname - drf
|
||||
sm = SchedulingPolicy.parse("drf");
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(DominantResourceFairnessPolicy.NAME));
|
||||
|
||||
// Shortname - fair
|
||||
sm = SchedulingPolicy.parse("fair");
|
||||
assertTrue("Invalid scheduler name",
|
||||
|
@ -93,7 +99,20 @@ public class TestSchedulingPolicy {
|
|||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
||||
assertTrue(ERR,
|
||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
||||
|
||||
|
||||
// drf
|
||||
policy = SchedulingPolicy.parse("drf");
|
||||
assertTrue(ERR,
|
||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
|
||||
assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
|
||||
SchedulingPolicy.DEPTH_INTERMEDIATE));
|
||||
assertTrue(ERR,
|
||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
|
||||
assertTrue(ERR,
|
||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
|
||||
assertTrue(ERR,
|
||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
||||
|
||||
policy = Mockito.mock(SchedulingPolicy.class);
|
||||
Mockito.when(policy.getApplicableDepth()).thenReturn(
|
||||
SchedulingPolicy.DEPTH_PARENT);
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* comparator.compare(sched1, sched2) < 0 means that sched1 should get a
|
||||
* container before sched2
|
||||
*/
|
||||
public class TestDominantResourceFairnessPolicy {
|
||||
|
||||
private Comparator<Schedulable> createComparator(int clusterMem,
|
||||
int clusterCpu) {
|
||||
DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
|
||||
policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu));
|
||||
return policy.getComparator();
|
||||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage) {
|
||||
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL, 0, 0);
|
||||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
int minMemShare, int minCpuShare) {
|
||||
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL,
|
||||
minMemShare, minCpuShare);
|
||||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
ResourceWeights weights) {
|
||||
return createSchedulable(memUsage, cpuUsage, weights, 0, 0);
|
||||
}
|
||||
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
ResourceWeights weights, int minMemShare, int minCpuShare) {
|
||||
Resource usage = BuilderUtils.newResource(memUsage, cpuUsage);
|
||||
Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare);
|
||||
return new FakeSchedulable(Resources.none(), minShare, weights,
|
||||
Resources.none(), usage, 0l);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 4).compare(
|
||||
createSchedulable(1000, 1),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(4000, 3),
|
||||
createSchedulable(2000, 5)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneIsNeedy() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(2000, 5, 0, 6),
|
||||
createSchedulable(4000, 3, 0, 0)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBothAreNeedy() {
|
||||
assertTrue(createComparator(8000, 100).compare(
|
||||
// dominant share is 2000/8000
|
||||
createSchedulable(2000, 5),
|
||||
// dominant share is 4000/8000
|
||||
createSchedulable(4000, 3)) < 0);
|
||||
assertTrue(createComparator(8000, 100).compare(
|
||||
// dominant min share is 2/3
|
||||
createSchedulable(2000, 5, 3000, 6),
|
||||
// dominant min share is 4/5
|
||||
createSchedulable(4000, 3, 5000, 4)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvenWeightsSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvenWeightsDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnevenWeightsSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnevenWeightsDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateShares() {
|
||||
Resource used = Resources.createResource(10, 5);
|
||||
Resource capacity = Resources.createResource(100, 10);
|
||||
ResourceType[] resourceOrder = new ResourceType[2];
|
||||
ResourceWeights shares = new ResourceWeights();
|
||||
DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessPolicy.DominantResourceFairnessComparator();
|
||||
comparator.calculateShares(used, capacity, shares, resourceOrder,
|
||||
ResourceWeights.NEUTRAL);
|
||||
|
||||
assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001);
|
||||
assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001);
|
||||
assertEquals(ResourceType.CPU, resourceOrder[0]);
|
||||
assertEquals(ResourceType.MEMORY, resourceOrder[1]);
|
||||
}
|
||||
}
|
|
@ -31,17 +31,18 @@ Hadoop MapReduce Next Generation - Fair Scheduler
|
|||
* {Introduction}
|
||||
|
||||
Fair scheduling is a method of assigning resources to applications such that
|
||||
all apps get, on average, an equal share of resources over time.
|
||||
Hadoop NextGen is capable of scheduling multiple resource types, such as
|
||||
Memory and CPU. Currently only memory is supported, so a "cluster share" is
|
||||
a proportion of aggregate memory in the cluster. When there is a single app
|
||||
running, that app uses the entire cluster. When other apps are submitted,
|
||||
resources that free up are assigned to the new apps, so that each app gets
|
||||
roughly the same amount of resources. Unlike the default Hadoop scheduler,
|
||||
which forms a queue of apps, this lets short apps finish in reasonable time
|
||||
while not starving long-lived apps. It is also a reasonable way to share a
|
||||
cluster between a number of users. Finally, fair sharing can also work with
|
||||
app priorities - the priorities are used as weights to determine the
|
||||
all apps get, on average, an equal share of resources over time.
|
||||
Hadoop NextGen is capable of scheduling multiple resource types. By default,
|
||||
the Fair Scheduler bases scheduling fairness decisions only on memory. It
|
||||
can be configured to schedule with both memory and CPU, using the notion
|
||||
of Dominant Resource Fairness developed by Ghodsi et al. When there is a
|
||||
single app running, that app uses the entire cluster. When other apps are
|
||||
submitted, resources that free up are assigned to the new apps, so that each
|
||||
app eventually on gets roughly the same amount of resources. Unlike the default
|
||||
Hadoop scheduler, which forms a queue of apps, this lets short apps finish in
|
||||
reasonable time while not starving long-lived apps. It is also a reasonable way
|
||||
to share a cluster between a number of users. Finally, fair sharing can also
|
||||
work with app priorities - the priorities are used as weights to determine the
|
||||
fraction of total resources that each app should get.
|
||||
|
||||
The scheduler organizes apps further into "queues", and shares resources
|
||||
|
@ -49,9 +50,10 @@ Hadoop MapReduce Next Generation - Fair Scheduler
|
|||
called “default”. If an app specifically lists a queue in a container
|
||||
resource request, the request is submitted to that queue. It is also
|
||||
possible to assign queues based on the user name included with the request
|
||||
through configuration. Within each queue, fair sharing is used to share
|
||||
capacity between the running apps. queues can also be given weights to share
|
||||
the cluster non-proportionally in the config file.
|
||||
through configuration. Within each queue, a scheduling policy is used to share
|
||||
resources between the running apps. The default is memory-based fair sharing,
|
||||
but FIFO and multi-resource with Dominant Resource Fairness can also be
|
||||
configured. Queues can be configured with weights to share the cluster non-evenly.
|
||||
|
||||
The fair scheduler supports hierarchical queues. All queues descend from a
|
||||
queue named "root". Available resources are distributed among the children
|
||||
|
@ -120,14 +122,6 @@ Hadoop MapReduce Next Generation - Fair Scheduler
|
|||
queues and their properties, in addition to certain policy defaults. This file
|
||||
must be in XML format as described in the next section.
|
||||
|
||||
* <<<yarn.scheduler.fair.minimum-allocation-mb>>>
|
||||
|
||||
* The smallest container size the scheduler can allocate, in MB of memory.
|
||||
|
||||
* <<<yarn.scheduler.fair.maximum-allocation-mb>>>
|
||||
|
||||
* The largest container the scheduler can allocate, in MB of memory.
|
||||
|
||||
* <<<yarn.scheduler.fair.user-as-default-queue>>>
|
||||
|
||||
* Whether to use the username associated with the allocation as the default
|
||||
|
@ -183,17 +177,23 @@ Allocation file format
|
|||
* <<Queue elements>>, which represent queues. Each may contain the following
|
||||
properties:
|
||||
|
||||
* minResources: minimum MB of aggregate memory the queue expects. If a queue
|
||||
demands resources, and its current allocation is below its configured minimum,
|
||||
it will be assigned available resources before any queue that is not in this
|
||||
situation. If multiple queues are in this situation, resources go to the
|
||||
queue with the smallest ratio between allocation and minimum. Note that it is
|
||||
possible that a queue that is below its minimum may not immediately get up to
|
||||
its minimum when it submits an application, because already-running jobs may
|
||||
be using those resources.
|
||||
* minResources: minimum resources the queue is entitled to, in the form
|
||||
"X mb, Y vcores". If a queue's minimum share is not satisfied, it will be
|
||||
offered available resources before any other queue under the same parent.
|
||||
Under the single-resource fairness policy, a queue
|
||||
is considered unsatisfied if its memory usage is below its minimum memory
|
||||
share. Under dominant resource fairness, a queue is considered unsatisfied
|
||||
if its usage for its dominant resource with respect to the cluster capacity
|
||||
is below its minimum share for that resource. If multiple queues are
|
||||
unsatisfied in this situation, resources go to the queue with the smallest
|
||||
ratio between relevant resource usage and minimum. Note that it is
|
||||
possible that a queue that is below its minimum may not immediately get up
|
||||
to its minimum when it submits an application, because already-running jobs
|
||||
may be using those resources.
|
||||
|
||||
* maxResources: maximum MB of aggregate memory a queue is allowed. A queue
|
||||
will never be assigned a container that would put it over this limit.
|
||||
* maxResources: maximum resources a queue is allowed, in the form
|
||||
"X mb, Y vcores". A queue will never be assigned a container that would
|
||||
put its aggregate usage over this limit.
|
||||
|
||||
* maxRunningApps: limit the number of apps from the queue to run at once
|
||||
|
||||
|
@ -232,13 +232,13 @@ Allocation file format
|
|||
<?xml version="1.0"?>
|
||||
<allocations>
|
||||
<queue name="sample_queue">
|
||||
<minResources>10000</minResources>
|
||||
<maxResources>90000</maxResources>
|
||||
<minResources>10000 mb</minResources>
|
||||
<maxResources>90000 mb</maxResources>
|
||||
<maxRunningApps>50</maxRunningApps>
|
||||
<weight>2.0</weight>
|
||||
<schedulingMode>fair</schedulingMode>
|
||||
<queue name="sample_sub_queue">
|
||||
<minResources>5000</minResources>
|
||||
<minResources>5000 mb</minResources>
|
||||
</queue>
|
||||
</queue>
|
||||
<user name="sample_user">
|
||||
|
|
Loading…
Reference in New Issue