From c1b635ed4826b0f9c8574d262dfeb13fa5ceb650 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Mon, 3 Jun 2013 17:33:55 +0000 Subject: [PATCH] YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489070 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resource/ResourceType.java | 28 ++ .../resource/ResourceWeights.java | 66 +++++ .../AllocationConfigurationException.java | 4 + .../scheduler/fair/AppSchedulable.java | 8 +- .../scheduler/fair/FSQueue.java | 3 +- .../scheduler/fair/FSSchedulerNode.java | 4 +- .../scheduler/fair/FairScheduler.java | 49 +--- .../fair/FairSchedulerConfiguration.java | 48 +++- .../scheduler/fair/QueueManager.java | 40 ++- .../scheduler/fair/Schedulable.java | 7 +- .../scheduler/fair/SchedulingPolicy.java | 14 +- .../DominantResourceFairnessPolicy.java | 172 +++++++++++ .../fair/policies/FairSharePolicy.java | 14 +- .../scheduler/fair/FakeSchedulable.java | 22 +- .../scheduler/fair/TestFairScheduler.java | 268 +++++++++++++++--- .../fair/TestFairSchedulerConfiguration.java | 58 ++++ .../scheduler/fair/TestSchedulingPolicy.java | 21 +- .../TestDominantResourceFairnessPolicy.java | 162 +++++++++++ .../src/site/apt/FairScheduler.apt.vm | 70 ++--- 20 files changed, 892 insertions(+), 169 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 991c27a5a6e..a1c6ccc6a73 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -108,6 +108,9 @@ Release 2.1.0-beta - UNRELEASED YARN-392. Make it possible to specify hard locality constraints in resource requests. (sandyr via tucu) + YARN-326. Add multi-resource scheduling to the fair scheduler. + (sandyr via tucu) + IMPROVEMENTS YARN-365. Change NM heartbeat handling to not generate a scheduler event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java new file mode 100644 index 00000000000..9dd245b26bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceType.java @@ -0,0 +1,28 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.resource; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Private +@Evolving +public enum ResourceType { + MEMORY, CPU +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java new file mode 100644 index 00000000000..fd83ed3ed7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceWeights.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.resource; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Private +@Evolving +public class ResourceWeights { + public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f); + + private float[] weights = new float[ResourceType.values().length]; + + public ResourceWeights(float memoryWeight, float cpuWeight) { + weights[ResourceType.MEMORY.ordinal()] = memoryWeight; + weights[ResourceType.CPU.ordinal()] = cpuWeight; + } + + public ResourceWeights(float weight) { + for (int i = 0; i < weights.length; i++) { + weights[i] = weight; + } + } + + public ResourceWeights() { } + + public void setWeight(ResourceType resourceType, float weight) { + weights[resourceType.ordinal()] = weight; + } + + public float getWeight(ResourceType resourceType) { + return weights[resourceType.ordinal()]; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("<"); + for (int i = 0; i < ResourceType.values().length; i++) { + if (i != 0) { + sb.append(", "); + } + ResourceType resourceType = ResourceType.values()[i]; + sb.append(resourceType.name().toLowerCase()); + sb.append(String.format(" weight=%.1f", getWeight(resourceType))); + } + sb.append(">"); + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java index 38672b5e5f9..41292278f6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfigurationException.java @@ -32,4 +32,8 @@ public class AllocationConfigurationException extends Exception { public AllocationConfigurationException(String message) { super(message); } + + public AllocationConfigurationException(String message, Throwable t) { + super(message, t); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 0d38de6a0cb..4e7ee5d67f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -119,7 +120,7 @@ public QueueMetrics getMetrics() { } @Override - public double getWeight() { + public ResourceWeights getWeights() { return scheduler.getAppWeight(this); } @@ -237,10 +238,7 @@ private Resource assignContainer(FSSchedulerNode node, } // Can we allocate a container on this node? - int availableContainers = - available.getMemory() / capability.getMemory(); - - if (availableContainers > 0) { + if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = app.allocate(type, node, priority, request, container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index dbe16cea6c4..f92cf98cd48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -84,7 +85,7 @@ public abstract void setPolicy(SchedulingPolicy policy) throws AllocationConfigurationException; @Override - public double getWeight() { + public ResourceWeights getWeights() { return queueMgr.getQueueWeight(getName()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index d2e6f08320f..9e373dda957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -180,8 +180,8 @@ private synchronized void deductAvailableResource(Resource resource) { @Override public String toString() { return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() + - " available=" + getAvailableResource().getMemory() + - " used=" + getUsedResource().getMemory(); + " available=" + getAvailableResource() + + " used=" + getUsedResource(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5904ae8b22d..f8cff0ef258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -57,8 +57,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -495,14 +497,14 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { } // synchronized for sizeBasedWeight - public synchronized double getAppWeight(AppSchedulable app) { + public synchronized ResourceWeights getAppWeight(AppSchedulable app) { if (!app.getRunnable()) { // Job won't launch tasks, but don't return 0 to avoid division errors - return 1.0; + return ResourceWeights.NEUTRAL; } else { double weight = 1.0; if (sizeBasedWeight) { - // Set weight based on current demand + // Set weight based on current memory demand weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2); } weight *= app.getPriority().getPriority(); @@ -510,7 +512,7 @@ public synchronized double getAppWeight(AppSchedulable app) { // Run weight through the user-supplied weightAdjuster weight = weightAdjuster.adjustWeight(app, weight); } - return weight; + return new ResourceWeights((float)weight); } } @@ -714,37 +716,6 @@ private synchronized void removeNode(RMNode rmNode) { " cluster capacity: " + clusterCapacity); } - /** - * Utility method to normalize a list of resource requests, by ensuring that - * the memory for each request is a multiple of minMemory and is not zero. - * - * @param asks a list of resource requests - * @param minMemory the configured minimum memory allocation - * @param maxMemory the configured maximum memory allocation - */ - static void normalizeRequests(List asks, - int minMemory, int maxMemory) { - for (ResourceRequest ask : asks) { - normalizeRequest(ask, minMemory, maxMemory); - } - } - - /** - * Utility method to normalize a resource request, by ensuring that the - * requested memory is a multiple of minMemory and is not zero. - * - * @param ask the resource request - * @param minMemory the configured minimum memory allocation - * @param maxMemory the configured maximum memory allocation - */ - static void normalizeRequest(ResourceRequest ask, int minMemory, - int maxMemory) { - int memory = Math.max(ask.getCapability().getMemory(), minMemory); - int normalizedMemory = - minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)); - ask.getCapability().setMemory(Math.min(normalizedMemory, maxMemory)); - } - @Override public Allocation allocate(ApplicationAttemptId appAttemptId, List ask, List release) { @@ -758,8 +729,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, } // Sanity check - normalizeRequests(ask, minimumAllocation.getMemory(), - maximumAllocation.getMemory()); + SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), + clusterCapacity, minimumAllocation, maximumAllocation); // Release containers for (ContainerId releasedContainerId : release) { @@ -1015,8 +986,8 @@ public void recover(RMState state) throws Exception { public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { this.conf = new FairSchedulerConfiguration(conf); - minimumAllocation = this.conf.getMinimumMemoryAllocation(); - maximumAllocation = this.conf.getMaximumMemoryAllocation(); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 05307cc31b5..56967468ea5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -18,12 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.util.BuilderUtils; @Private @Evolving @@ -78,18 +82,24 @@ public FairSchedulerConfiguration(Configuration conf) { addResource(FS_CONFIGURATION_FILE); } - public Resource getMinimumMemoryAllocation() { + public Resource getMinimumAllocation() { int mem = getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - return Resources.createResource(mem); + int cpu = getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + return Resources.createResource(mem, cpu); } - public Resource getMaximumMemoryAllocation() { + public Resource getMaximumAllocation() { int mem = getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - return Resources.createResource(mem); + int cpu = getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + return Resources.createResource(mem, cpu); } public boolean getUserAsDefaultQueue() { @@ -136,4 +146,34 @@ public int getPreemptionInterval() { public int getWaitTimeBeforeKill() { return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL); } + + /** + * Parses a resource config value of a form like "1024", "1024 mb", + * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. + * + * @throws AllocationConfigurationException + */ + public static Resource parseResourceConfigValue(String val) + throws AllocationConfigurationException { + try { + int memory = findResource(val, "mb"); + int vcores = findResource(val, "vcores"); + return BuilderUtils.newResource(memory, vcores); + } catch (AllocationConfigurationException ex) { + throw ex; + } catch (Exception ex) { + throw new AllocationConfigurationException( + "Error reading resource config", ex); + } + } + + private static int findResource(String val, String units) + throws AllocationConfigurationException { + Pattern pattern = Pattern.compile("(\\d+) ?" + units); + Matcher matcher = pattern.matcher(val); + if (!matcher.find()) { + throw new AllocationConfigurationException("Missing resource: " + units); + } + return Integer.parseInt(matcher.group(1)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 1365565e563..492bbb32e0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -301,7 +302,7 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, Map maxQueueResources = new HashMap(); Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); - Map queueWeights = new HashMap(); + Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); Map> queueAcls = @@ -415,7 +416,7 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, */ private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueWeights, + Map userMaxApps, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map> queueAcls, List queueNamesInAllocFile) @@ -433,12 +434,12 @@ private void loadQueue(String parentName, Element element, Map Element field = (Element) fieldNode; if ("minResources".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - minQueueResources.put(queueName, Resources.createResource(val)); + Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + minQueueResources.put(queueName, val); } else if ("maxResources".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - maxQueueResources.put(queueName, Resources.createResource(val)); + Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + maxQueueResources.put(queueName, val); } else if ("maxRunningApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -446,7 +447,7 @@ private void loadQueue(String parentName, Element element, Map } else if ("weight".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); double val = Double.parseDouble(text); - queueWeights.put(queueName, val); + queueWeights.put(queueName, new ResourceWeights((float)val)); } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; @@ -454,7 +455,9 @@ private void loadQueue(String parentName, Element element, Map } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - queuePolicies.put(queueName, SchedulingPolicy.parse(text)); + SchedulingPolicy policy = SchedulingPolicy.parse(text); + policy.initialize(scheduler.getClusterCapacity()); + queuePolicies.put(queueName, policy); } else if ("aclSubmitApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); @@ -510,13 +513,20 @@ public Resource getMaxResources(String queueName) { } /** - * Get a collection of all queues + * Get a collection of all leaf queues */ public Collection getLeafQueues() { synchronized (queues) { return leafQueues; } } + + /** + * Get a collection of all queues + */ + public Collection getQueues() { + return queues.values(); + } public int getUserMaxApps(String user) { // save current info in case it gets changed under us @@ -538,12 +548,12 @@ public int getQueueMaxApps(String queue) { } } - public double getQueueWeight(String queue) { - Double weight = info.queueWeights.get(queue); + public ResourceWeights getQueueWeight(String queue) { + ResourceWeights weight = info.queueWeights.get(queue); if (weight != null) { return weight; } else { - return 1.0; + return ResourceWeights.NEUTRAL; } } @@ -595,7 +605,7 @@ static class QueueManagerInfo { // Maximum amount of resources per queue public final Map maxQueueResources; // Sharing weights for each queue - public final Map queueWeights; + public final Map queueWeights; // Max concurrent running applications for each queue and for each user; in addition, // for users that have no max specified, we use the userMaxJobsDefault. @@ -625,7 +635,7 @@ static class QueueManagerInfo { public QueueManagerInfo(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, - Map queueWeights, int userMaxAppsDefault, + Map queueWeights, int userMaxAppsDefault, int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, Map> queueAcls, @@ -647,7 +657,7 @@ public QueueManagerInfo(Map minQueueResources, public QueueManagerInfo() { minQueueResources = new HashMap(); maxQueueResources = new HashMap(); - queueWeights = new HashMap(); + queueWeights = new HashMap(); queueMaxApps = new HashMap(); userMaxApps = new HashMap(); userMaxAppsDefault = Integer.MAX_VALUE; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index c68e9d10bec..3736f74eef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; /** @@ -80,7 +81,7 @@ public abstract class Schedulable { /** Job/queue weight in fair sharing. */ - public abstract double getWeight(); + public abstract ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ public abstract long getStartTime(); @@ -110,7 +111,7 @@ public Resource getFairShare() { /** Convenient toString implementation for debugging. */ @Override public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeight()); + return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", + getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index e6e771b69c5..06f384045e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -67,11 +68,12 @@ public static SchedulingPolicy getInstance(Class cla /** * Returns {@link SchedulingPolicy} instance corresponding to the * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for - * FairsharePolicy or "fifo" for FifoPolicy. For custom + * FairSharePolicy, "fifo" for FifoPolicy, or "drf" for + * DominantResourceFairnessPolicy. For a custom * {@link SchedulingPolicy}s in the RM classpath, the policy should be * canonical class name of the {@link SchedulingPolicy}. * - * @param policy canonical class name or "fair" or "fifo" + * @param policy canonical class name or "drf" or "fair" or "fifo" * @throws AllocationConfigurationException */ @SuppressWarnings("unchecked") @@ -80,10 +82,12 @@ public static SchedulingPolicy parse(String policy) @SuppressWarnings("rawtypes") Class clazz; String text = policy.toLowerCase(); - if (text.equals("fair")) { + if (text.equalsIgnoreCase(FairSharePolicy.NAME)) { clazz = FairSharePolicy.class; - } else if (text.equals("fifo")) { + } else if (text.equalsIgnoreCase(FifoPolicy.NAME)) { clazz = FifoPolicy.class; + } else if (text.equalsIgnoreCase(DominantResourceFairnessPolicy.NAME)) { + clazz = DominantResourceFairnessPolicy.class; } else { try { clazz = Class.forName(policy); @@ -98,6 +102,8 @@ public static SchedulingPolicy parse(String policy) } return getInstance(clazz); } + + public void initialize(Resource clusterCapacity) {} /** * @return returns the name of {@link SchedulingPolicy} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java new file mode 100644 index 00000000000..7d37f4a554e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import java.util.Collection; +import java.util.Comparator; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; + +import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*; + +/** + * Makes scheduling decisions by trying to equalize dominant resource usage. + * A schedulable's dominant resource usage is the largest ratio of resource + * usage to capacity among the resource types it is using. + */ +@Private +@Unstable +public class DominantResourceFairnessPolicy extends SchedulingPolicy { + + public static final String NAME = "DRF"; + + private DominantResourceFairnessComparator comparator = + new DominantResourceFairnessComparator(); + + @Override + public String getName() { + return NAME; + } + + @Override + public byte getApplicableDepth() { + return SchedulingPolicy.DEPTH_ANY; + } + + @Override + public Comparator getComparator() { + return comparator; + } + + @Override + public void computeShares(Collection schedulables, + Resource totalResources) { + + // TODO: For now, set all fair shares to 0, because, in the context of DRF, + // it doesn't make sense to set a value for each resource. YARN-736 should + // add in a sensible replacement. + + for (Schedulable schedulable : schedulables) { + schedulable.setFairShare(Resources.none()); + } + } + + @Override + public void initialize(Resource clusterCapacity) { + comparator.setClusterCapacity(clusterCapacity); + } + + public static class DominantResourceFairnessComparator implements Comparator { + private static final int NUM_RESOURCES = ResourceType.values().length; + + private Resource clusterCapacity; + + public void setClusterCapacity(Resource clusterCapacity) { + this.clusterCapacity = clusterCapacity; + } + + @Override + public int compare(Schedulable s1, Schedulable s2) { + ResourceWeights sharesOfCluster1 = new ResourceWeights(); + ResourceWeights sharesOfCluster2 = new ResourceWeights(); + ResourceWeights sharesOfMinShare1 = new ResourceWeights(); + ResourceWeights sharesOfMinShare2 = new ResourceWeights(); + ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES]; + ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES]; + + // Calculate shares of the cluster for each resource both schedulables. + calculateShares(s1.getResourceUsage(), + clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights()); + calculateShares(s1.getResourceUsage(), + s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL); + calculateShares(s2.getResourceUsage(), + clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights()); + calculateShares(s2.getResourceUsage(), + s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL); + + // A queue is needy for its min share if its dominant resource + // (with respect to the cluster capacity) is below its configured min share + // for that resource + boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f; + boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f; + + int res = 0; + if (!s2Needy && !s1Needy) { + res = compareShares(sharesOfCluster1, sharesOfCluster2, + resourceOrder1, resourceOrder2); + } else if (s1Needy && !s2Needy) { + res = -1; + } else if (s2Needy && !s1Needy) { + res = 1; + } else { // both are needy below min share + res = compareShares(sharesOfMinShare1, sharesOfMinShare2, + resourceOrder1, resourceOrder2); + } + if (res == 0) { + // Apps are tied in fairness ratio. Break the tie by submit time. + res = (int)(s1.getStartTime() - s2.getStartTime()); + } + return res; + } + + /** + * Calculates and orders a resource's share of a pool in terms of two vectors. + * The shares vector contains, for each resource, the fraction of the pool that + * it takes up. The resourceOrder vector contains an ordering of resources + * by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>, + * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY]. + */ + void calculateShares(Resource resource, Resource pool, + ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) { + shares.setWeight(MEMORY, (float)resource.getMemory() / + (pool.getMemory() * weights.getWeight(MEMORY))); + shares.setWeight(CPU, (float)resource.getVirtualCores() / + (pool.getVirtualCores() * weights.getWeight(CPU))); + // sort order vector by resource share + if (resourceOrder != null) { + if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) { + resourceOrder[0] = MEMORY; + resourceOrder[1] = CPU; + } else { + resourceOrder[0] = CPU; + resourceOrder[1] = MEMORY; + } + } + } + + private int compareShares(ResourceWeights shares1, ResourceWeights shares2, + ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) { + for (int i = 0; i < resourceOrder1.length; i++) { + int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i]) + - shares2.getWeight(resourceOrder2[i])); + if (ret != 0) { + return ret; + } + } + return 0; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 910dd61e662..ed82631e1f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -25,17 +25,21 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import com.google.common.annotations.VisibleForTesting; +/** + * Makes scheduling decisions by trying to equalize shares of memory. + */ @Private @Unstable public class FairSharePolicy extends SchedulingPolicy { @VisibleForTesting - public static final String NAME = "Fairshare"; + public static final String NAME = "fair"; private static final DefaultResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); private FairShareComparator comparator = new FairShareComparator(); @@ -79,8 +83,10 @@ public int compare(Schedulable s1, Schedulable s2) { / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory(); minShareRatio2 = (double) s2.getResourceUsage().getMemory() / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory(); - useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight(); - useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight(); + useToWeightRatio1 = s1.getResourceUsage().getMemory() / + s1.getWeights().getWeight(ResourceType.MEMORY); + useToWeightRatio2 = s2.getResourceUsage().getMemory() / + s2.getWeights().getWeight(ResourceType.MEMORY); int res = 0; if (s1Needy && !s2Needy) res = -1; @@ -220,7 +226,7 @@ private static Resource resUsedWithWeightToResRatio(double w2sRatio, * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}. */ private static Resource computeShare(Schedulable sched, double r2sRatio) { - double share = sched.getWeight() * r2sRatio; + double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio; share = Math.max(share, sched.getMinShare().getMemory()); share = Math.min(share, sched.getDemand().getMemory()); return Resources.createResource((int) share); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index b910cb935ce..13bd6b22854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.util.Records; @@ -30,7 +31,7 @@ public class FakeSchedulable extends Schedulable { private Resource demand; private Resource usage; private Resource minShare; - private double weight; + private ResourceWeights weights; private Priority priority; private long startTime; @@ -46,21 +47,22 @@ public FakeSchedulable(int demand, int minShare) { this(demand, minShare, 1, 0, 0, 0); } - public FakeSchedulable(int demand, int minShare, double weight) { - this(demand, minShare, weight, 0, 0, 0); + public FakeSchedulable(int demand, int minShare, double memoryWeight) { + this(demand, minShare, memoryWeight, 0, 0, 0); } public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage, long startTime) { - this(Resources.createResource(demand), Resources.createResource(minShare), weight, - Resources.createResource(fairShare), Resources.createResource(usage), startTime); + this(Resources.createResource(demand), Resources.createResource(minShare), + new ResourceWeights((float)weight), Resources.createResource(fairShare), + Resources.createResource(usage), startTime); } - public FakeSchedulable(Resource demand, Resource minShare, double weight, Resource fairShare, - Resource usage, long startTime) { + public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight, + Resource fairShare, Resource usage, long startTime) { this.demand = demand; this.minShare = minShare; - this.weight = weight; + this.weights = weight; setFairShare(fairShare); this.usage = usage; this.priority = Records.newRecord(Priority.class); @@ -98,8 +100,8 @@ public long getStartTime() { } @Override - public double getWeight() { - return weight; + public ResourceWeights getWeights() { + return weights; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index b0c0a42fe55..e2c753f21fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; @@ -147,12 +148,17 @@ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { ApplicationAttemptId.newInstance(appIdImpl, attemptId); return attId; } - - + private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers, boolean relaxLocality) { + return createResourceRequest(memory, 1, host, priority, numContainers, + relaxLocality); + } + + private ResourceRequest createResourceRequest(int memory, int vcores, String host, + int priority, int numContainers, boolean relaxLocality) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); - request.setCapability(Resources.createResource(memory)); + request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); request.setNumContainers(numContainers); Priority prio = recordFactory.newRecordInstance(Priority.class); @@ -170,18 +176,34 @@ private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) { return createSchedulingRequest(memory, queueId, userId, 1); } + + private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, + String queueId, String userId) { + return createSchedulingRequest(memory, vcores, queueId, userId, 1); + } private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) { return createSchedulingRequest(memory, queueId, userId, numContainers, 1); } + + private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, + String queueId, String userId, int numContainers) { + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); + } private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, 1, queueId, userId, numContainers, + priority); + } + + private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, + String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(id, queueId, userId); List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, + ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, priority, numContainers, true); ask.add(request); scheduler.allocate(id, ask, new ArrayList()); @@ -451,10 +473,10 @@ public void testFairShareWithMinAlloc() throws Exception { out.println(""); out.println(""); out.println(""); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); out.close(); @@ -569,11 +591,11 @@ public void testAllocationFileParsing() throws Exception { out.println(""); // Give queue A a minimum of 1024 M out.println(""); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); // Give queue B a minimum of 2048 M out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println("alice,bob admins"); out.println(""); // Give queue C no minimum @@ -613,9 +635,9 @@ public void testAllocationFileParsing() throws Exception { assertEquals(Resources.createResource(0), queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(Resources.createResource(1024), + assertEquals(Resources.createResource(1024, 0), queueManager.getMinResources("root.queueA")); - assertEquals(Resources.createResource(2048), + assertEquals(Resources.createResource(2048, 0), queueManager.getMinResources("root.queueB")); assertEquals(Resources.createResource(0), queueManager.getMinResources("root.queueC")); @@ -672,15 +694,15 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX out.println(""); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); out.println(""); @@ -710,11 +732,11 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.println(""); // Give queue A a minimum of 1024 M out.println(""); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); // Give queue B a minimum of 2048 M out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println("alice,bob admins"); out.println(""); // Give queue C no minimum @@ -754,9 +776,9 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals(Resources.createResource(0), queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(Resources.createResource(1024), + assertEquals(Resources.createResource(1024, 0), queueManager.getMinResources("root.queueA")); - assertEquals(Resources.createResource(2048), + assertEquals(Resources.createResource(2048, 0), queueManager.getMinResources("root.queueB")); assertEquals(Resources.createResource(0), queueManager.getMinResources("root.queueC")); @@ -812,10 +834,10 @@ public void testIsStarvedForMinShare() throws Exception { out.println(""); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); - out.println("2048"); + out.println("2048mb,0vcores"); out.println(""); out.println(""); out.close(); @@ -825,7 +847,7 @@ public void testIsStarvedForMinShare() throws Exception { // Add one big node (only care about aggregate capacity) RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -885,7 +907,7 @@ public void testIsStarvedForFairShare() throws Exception { // Add one big node (only care about aggregate capacity) RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -963,19 +985,19 @@ public void testChoiceOfPreemptedContainers() throws Exception { // Create four nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, "127.0.0.3"); NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); scheduler.handle(nodeEvent3); @@ -1106,19 +1128,19 @@ public void testPreemptionDecision() throws Exception { out.println(""); out.println(""); out.println(".25"); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); out.println(""); out.println(".25"); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); out.println(""); out.println(".25"); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); out.println(""); out.println(".25"); - out.println("1024"); + out.println("1024mb,0vcores"); out.println(""); out.print("5"); out.print("10"); @@ -1130,19 +1152,19 @@ public void testPreemptionDecision() throws Exception { // Create four nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 1, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 2, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024), 3, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, "127.0.0.3"); NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); scheduler.handle(nodeEvent3); @@ -1206,19 +1228,19 @@ public void testPreemptionDecision() throws Exception { // After minSharePreemptionTime has passed, they should want to preempt min // share. clock.tick(6); - assertTrue(Resources.equals( - Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime()))); - assertTrue(Resources.equals( - Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime()))); + assertEquals( + 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); // After fairSharePreemptionTime has passed, they should want to preempt // fair share. scheduler.update(); clock.tick(6); - assertTrue(Resources.equals( - Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime()))); - assertTrue(Resources.equals( - Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime()))); + assertEquals( + 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); } @Test (timeout = 5000) @@ -1271,7 +1293,7 @@ public void testUserMaxRunningApps() throws Exception { // Add a node RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -1443,7 +1465,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { public void testFifoWithinQueue() throws Exception { RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -1488,7 +1510,7 @@ public void testMaxAssign() throws AllocationConfigurationException { .setPolicy(SchedulingPolicy.getDefault()); RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(16384), 0, + MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); @@ -1536,10 +1558,10 @@ public void testAssignContainer() throws Exception { RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(8192), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); RMNode node2 = MockNodes - .newNodeInfo(1, Resources.createResource(8192), 2, "127.0.0.2"); + .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); @@ -1685,7 +1707,8 @@ public void testReservationThatDoesntFit() { public void testRemoveNodeUpdatesRootQueueMetrics() { assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); - RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, + "127.0.0.1"); NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1); scheduler.handle(addEvent); @@ -1824,4 +1847,157 @@ public void testReservationsStrictLocality() { scheduler.handle(nodeUpdateEvent); assertEquals(0, app.getReservedContainers().size()); } + + public void testNoMoreCpuOnNode() { + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", + "user1", 2); + FSSchedulerApp app = scheduler.applications.get(attId); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + assertEquals(1, app.getLiveContainers().size()); + scheduler.handle(updateEvent); + assertEquals(1, app.getLiveContainers().size()); + } + + public void testBasicDRFAssignment() throws Exception { + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", + "user1", 2); + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", + "user1", 2); + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + + DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + drfPolicy.initialize(scheduler.getClusterCapacity()); + scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); + scheduler.update(); + + // First both apps get a container + // Then the first gets another container because its dominant share of + // 2048/8192 is less than the other's of 2/5 + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + Assert.assertEquals(1, app1.getLiveContainers().size()); + Assert.assertEquals(0, app2.getLiveContainers().size()); + + scheduler.handle(updateEvent); + Assert.assertEquals(1, app1.getLiveContainers().size()); + Assert.assertEquals(1, app2.getLiveContainers().size()); + + scheduler.handle(updateEvent); + Assert.assertEquals(2, app1.getLiveContainers().size()); + Assert.assertEquals(1, app2.getLiveContainers().size()); + } + + /** + * Two apps on one queue, one app on another + */ + @Test + public void testBasicDRFWithQueues() throws Exception { + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", + "user1", 2); + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", + "user1", 2); + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", + "user1", 2); + FSSchedulerApp app3 = scheduler.applications.get(appAttId3); + + DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + drfPolicy.initialize(scheduler.getClusterCapacity()); + scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); + scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + Assert.assertEquals(1, app1.getLiveContainers().size()); + scheduler.handle(updateEvent); + Assert.assertEquals(1, app3.getLiveContainers().size()); + scheduler.handle(updateEvent); + Assert.assertEquals(2, app3.getLiveContainers().size()); + scheduler.handle(updateEvent); + Assert.assertEquals(1, app2.getLiveContainers().size()); + } + + @Test + public void testDRFHierarchicalQueues() throws Exception { + RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", + "user1", 2); + Thread.sleep(3); // so that start times will be different + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", + "user1", 2); + Thread.sleep(3); // so that start times will be different + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); + ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", + "user1", 2); + Thread.sleep(3); // so that start times will be different + FSSchedulerApp app3 = scheduler.applications.get(appAttId3); + ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", + "user1", 2); + Thread.sleep(3); // so that start times will be different + FSSchedulerApp app4 = scheduler.applications.get(appAttId4); + + DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + drfPolicy.initialize(scheduler.getClusterCapacity()); + scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); + scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); + scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(updateEvent); + // app1 gets first container because it asked first + Assert.assertEquals(1, app1.getLiveContainers().size()); + scheduler.handle(updateEvent); + // app4 gets second container because it's on queue2 + Assert.assertEquals(1, app4.getLiveContainers().size()); + scheduler.handle(updateEvent); + // app4 gets another container because queue2's dominant share of memory + // is still less than queue1's of cpu + Assert.assertEquals(2, app4.getLiveContainers().size()); + scheduler.handle(updateEvent); + // app3 gets one because queue1 gets one and queue1.subqueue2 is behind + // queue1.subqueue1 + Assert.assertEquals(1, app3.getLiveContainers().size()); + scheduler.handle(updateEvent); + // app4 would get another one, but it doesn't have any requests + // queue1.subqueue2 is still using less than queue1.subqueue1, so it + // gets another + Assert.assertEquals(2, app3.getLiveContainers().size()); + // queue1.subqueue1 is behind again, so it gets one, which it gives to app2 + scheduler.handle(updateEvent); + Assert.assertEquals(1, app2.getLiveContainers().size()); + + // at this point, we've used all our CPU up, so nobody else should get a container + scheduler.handle(updateEvent); + + Assert.assertEquals(1, app1.getLiveContainers().size()); + Assert.assertEquals(1, app2.getLiveContainers().size()); + Assert.assertEquals(2, app3.getLiveContainers().size()); + Assert.assertEquals(2, app4.getLiveContainers().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java new file mode 100644 index 00000000000..40c3092e89e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java @@ -0,0 +1,58 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.*; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +public class TestFairSchedulerConfiguration { + @Test + public void testParseResourceConfigValue() throws Exception { + assertEquals(BuilderUtils.newResource(1024, 2), + parseResourceConfigValue("2 vcores, 1024 mb")); + assertEquals(BuilderUtils.newResource(1024, 2), + parseResourceConfigValue("1024 mb, 2 vcores")); + assertEquals(BuilderUtils.newResource(1024, 2), + parseResourceConfigValue("2vcores,1024mb")); + assertEquals(BuilderUtils.newResource(1024, 2), + parseResourceConfigValue("1024mb,2vcores")); + } + + @Test(expected = AllocationConfigurationException.class) + public void testNoUnits() throws Exception { + parseResourceConfigValue("1024"); + } + + @Test(expected = AllocationConfigurationException.class) + public void testOnlyMemory() throws Exception { + parseResourceConfigValue("1024mb"); + } + + @Test(expected = AllocationConfigurationException.class) + public void testOnlyCPU() throws Exception { + parseResourceConfigValue("1024vcores"); + } + + @Test(expected = AllocationConfigurationException.class) + public void testGibberish() throws Exception { + parseResourceConfigValue("1o24vc0res"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index e498e7eea98..eeedb09fc46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.junit.Test; @@ -49,6 +50,11 @@ public void testParseSchedulingPolicy() assertTrue("Invalid scheduler name", sm.getName().equals(FairSharePolicy.NAME)); + // Shortname - drf + sm = SchedulingPolicy.parse("drf"); + assertTrue("Invalid scheduler name", + sm.getName().equals(DominantResourceFairnessPolicy.NAME)); + // Shortname - fair sm = SchedulingPolicy.parse("fair"); assertTrue("Invalid scheduler name", @@ -93,7 +99,20 @@ public void testIsApplicableTo() throws AllocationConfigurationException { SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); - + + // drf + policy = SchedulingPolicy.parse("drf"); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); + assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, + SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); + policy = Mockito.mock(SchedulingPolicy.class); Mockito.when(policy.getApplicableDepth()).thenReturn( SchedulingPolicy.DEPTH_PARENT); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java new file mode 100644 index 00000000000..6fb8215043d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import static org.junit.Assert.*; + +import java.util.Comparator; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; + +/** + * comparator.compare(sched1, sched2) < 0 means that sched1 should get a + * container before sched2 + */ +public class TestDominantResourceFairnessPolicy { + + private Comparator createComparator(int clusterMem, + int clusterCpu) { + DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy(); + policy.initialize(BuilderUtils.newResource(clusterMem, clusterCpu)); + return policy.getComparator(); + } + + private Schedulable createSchedulable(int memUsage, int cpuUsage) { + return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL, 0, 0); + } + + private Schedulable createSchedulable(int memUsage, int cpuUsage, + int minMemShare, int minCpuShare) { + return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL, + minMemShare, minCpuShare); + } + + private Schedulable createSchedulable(int memUsage, int cpuUsage, + ResourceWeights weights) { + return createSchedulable(memUsage, cpuUsage, weights, 0, 0); + } + + + private Schedulable createSchedulable(int memUsage, int cpuUsage, + ResourceWeights weights, int minMemShare, int minCpuShare) { + Resource usage = BuilderUtils.newResource(memUsage, cpuUsage); + Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare); + return new FakeSchedulable(Resources.none(), minShare, weights, + Resources.none(), usage, 0l); + } + + @Test + public void testSameDominantResource() { + assertTrue(createComparator(8000, 4).compare( + createSchedulable(1000, 1), + createSchedulable(2000, 1)) < 0); + } + + @Test + public void testDifferentDominantResource() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(4000, 3), + createSchedulable(2000, 5)) < 0); + } + + @Test + public void testOneIsNeedy() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(2000, 5, 0, 6), + createSchedulable(4000, 3, 0, 0)) < 0); + } + + @Test + public void testBothAreNeedy() { + assertTrue(createComparator(8000, 100).compare( + // dominant share is 2000/8000 + createSchedulable(2000, 5), + // dominant share is 4000/8000 + createSchedulable(4000, 3)) < 0); + assertTrue(createComparator(8000, 100).compare( + // dominant min share is 2/3 + createSchedulable(2000, 5, 3000, 6), + // dominant min share is 4/5 + createSchedulable(4000, 3, 5000, 4)) < 0); + } + + @Test + public void testEvenWeightsSameDominantResource() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(3000, 1, new ResourceWeights(2.0f)), + createSchedulable(2000, 1)) < 0); + assertTrue(createComparator(8000, 8).compare( + createSchedulable(1000, 3, new ResourceWeights(2.0f)), + createSchedulable(1000, 2)) < 0); + } + + @Test + public void testEvenWeightsDifferentDominantResource() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(1000, 3, new ResourceWeights(2.0f)), + createSchedulable(2000, 1)) < 0); + assertTrue(createComparator(8000, 8).compare( + createSchedulable(3000, 1, new ResourceWeights(2.0f)), + createSchedulable(1000, 2)) < 0); + } + + @Test + public void testUnevenWeightsSameDominantResource() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)), + createSchedulable(2000, 1)) < 0); + assertTrue(createComparator(8000, 8).compare( + createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)), + createSchedulable(1000, 2)) < 0); + } + + @Test + public void testUnevenWeightsDifferentDominantResource() { + assertTrue(createComparator(8000, 8).compare( + createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)), + createSchedulable(2000, 1)) < 0); + assertTrue(createComparator(8000, 8).compare( + createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)), + createSchedulable(1000, 2)) < 0); + } + + @Test + public void testCalculateShares() { + Resource used = Resources.createResource(10, 5); + Resource capacity = Resources.createResource(100, 10); + ResourceType[] resourceOrder = new ResourceType[2]; + ResourceWeights shares = new ResourceWeights(); + DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = + new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); + comparator.calculateShares(used, capacity, shares, resourceOrder, + ResourceWeights.NEUTRAL); + + assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001); + assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001); + assertEquals(ResourceType.CPU, resourceOrder[0]); + assertEquals(ResourceType.MEMORY, resourceOrder[1]); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 3d6af1347ec..371d7c244a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -31,17 +31,18 @@ Hadoop MapReduce Next Generation - Fair Scheduler * {Introduction} Fair scheduling is a method of assigning resources to applications such that - all apps get, on average, an equal share of resources over time. - Hadoop NextGen is capable of scheduling multiple resource types, such as - Memory and CPU. Currently only memory is supported, so a "cluster share" is - a proportion of aggregate memory in the cluster. When there is a single app - running, that app uses the entire cluster. When other apps are submitted, - resources that free up are assigned to the new apps, so that each app gets - roughly the same amount of resources. Unlike the default Hadoop scheduler, - which forms a queue of apps, this lets short apps finish in reasonable time - while not starving long-lived apps. It is also a reasonable way to share a - cluster between a number of users. Finally, fair sharing can also work with - app priorities - the priorities are used as weights to determine the + all apps get, on average, an equal share of resources over time. + Hadoop NextGen is capable of scheduling multiple resource types. By default, + the Fair Scheduler bases scheduling fairness decisions only on memory. It + can be configured to schedule with both memory and CPU, using the notion + of Dominant Resource Fairness developed by Ghodsi et al. When there is a + single app running, that app uses the entire cluster. When other apps are + submitted, resources that free up are assigned to the new apps, so that each + app eventually on gets roughly the same amount of resources. Unlike the default + Hadoop scheduler, which forms a queue of apps, this lets short apps finish in + reasonable time while not starving long-lived apps. It is also a reasonable way + to share a cluster between a number of users. Finally, fair sharing can also + work with app priorities - the priorities are used as weights to determine the fraction of total resources that each app should get. The scheduler organizes apps further into "queues", and shares resources @@ -49,9 +50,10 @@ Hadoop MapReduce Next Generation - Fair Scheduler called “default”. If an app specifically lists a queue in a container resource request, the request is submitted to that queue. It is also possible to assign queues based on the user name included with the request - through configuration. Within each queue, fair sharing is used to share - capacity between the running apps. queues can also be given weights to share - the cluster non-proportionally in the config file. + through configuration. Within each queue, a scheduling policy is used to share + resources between the running apps. The default is memory-based fair sharing, + but FIFO and multi-resource with Dominant Resource Fairness can also be + configured. Queues can be configured with weights to share the cluster non-evenly. The fair scheduler supports hierarchical queues. All queues descend from a queue named "root". Available resources are distributed among the children @@ -120,14 +122,6 @@ Hadoop MapReduce Next Generation - Fair Scheduler queues and their properties, in addition to certain policy defaults. This file must be in XML format as described in the next section. - * <<>> - - * The smallest container size the scheduler can allocate, in MB of memory. - - * <<>> - - * The largest container the scheduler can allocate, in MB of memory. - * <<>> * Whether to use the username associated with the allocation as the default @@ -183,17 +177,23 @@ Allocation file format * <>, which represent queues. Each may contain the following properties: - * minResources: minimum MB of aggregate memory the queue expects. If a queue - demands resources, and its current allocation is below its configured minimum, - it will be assigned available resources before any queue that is not in this - situation. If multiple queues are in this situation, resources go to the - queue with the smallest ratio between allocation and minimum. Note that it is - possible that a queue that is below its minimum may not immediately get up to - its minimum when it submits an application, because already-running jobs may - be using those resources. + * minResources: minimum resources the queue is entitled to, in the form + "X mb, Y vcores". If a queue's minimum share is not satisfied, it will be + offered available resources before any other queue under the same parent. + Under the single-resource fairness policy, a queue + is considered unsatisfied if its memory usage is below its minimum memory + share. Under dominant resource fairness, a queue is considered unsatisfied + if its usage for its dominant resource with respect to the cluster capacity + is below its minimum share for that resource. If multiple queues are + unsatisfied in this situation, resources go to the queue with the smallest + ratio between relevant resource usage and minimum. Note that it is + possible that a queue that is below its minimum may not immediately get up + to its minimum when it submits an application, because already-running jobs + may be using those resources. - * maxResources: maximum MB of aggregate memory a queue is allowed. A queue - will never be assigned a container that would put it over this limit. + * maxResources: maximum resources a queue is allowed, in the form + "X mb, Y vcores". A queue will never be assigned a container that would + put its aggregate usage over this limit. * maxRunningApps: limit the number of apps from the queue to run at once @@ -232,13 +232,13 @@ Allocation file format - 10000 - 90000 + 10000 mb + 90000 mb 50 2.0 fair - 5000 + 5000 mb