diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a1ce01e7c1e..5caebb9d415 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -184,6 +184,9 @@ Release 2.0.5-beta - UNRELEASED YARN-577. Add application-progress also to ApplicationReport. (Hitesh Shah via vinodkv) + YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza + via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java index 5c94f927838..58cd676b235 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java @@ -208,4 +208,14 @@ public class Resources { Resource lhs, Resource rhs) { return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs; } + + public static boolean fitsIn(Resource smaller, Resource bigger) { + return smaller.getMemory() <= bigger.getMemory() && + smaller.getVirtualCores() <= bigger.getVirtualCores(); + } + + public static Resource componentwiseMin(Resource lhs, Resource rhs) { + return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), + Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ff5344d12a4..09b14a4b98e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,6 +33,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; public class FSLeafQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( @@ -126,8 +127,8 @@ public class FSLeafQueue extends FSQueue { + demand); } demand = Resources.add(demand, toAdd); - if (Resources.greaterThanOrEqual(demand, maxRes)) { - demand = maxRes; + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { break; } } @@ -153,7 +154,7 @@ public class FSLeafQueue extends FSQueue { for (AppSchedulable sched : appScheds) { if (sched.getRunnable()) { assigned = sched.assignContainer(node); - if (Resources.greaterThan(assigned, Resources.none())) { + if (!assigned.equals(Resources.none())) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 298ceeed056..253052a4181 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; public class FSParentQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( @@ -87,8 +88,8 @@ public class FSParentQueue extends FSQueue { " now " + demand); } demand = Resources.add(demand, toAdd); - if (Resources.greaterThanOrEqual(demand, maxRes)) { - demand = maxRes; + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { break; } } @@ -135,16 +136,14 @@ public class FSParentQueue extends FSQueue { Resource assigned = Resources.none(); // If this queue is over its limit, reject - if (Resources.greaterThan(getResourceUsage(), - queueMgr.getMaxResources(getName()))) { + if (!assignContainerPreCheck(node)) { return assigned; } Collections.sort(childQueues, policy.getComparator()); for (FSQueue child : childQueues) { assigned = child.assignContainer(node); - if (node.getReservedContainer() != null - || Resources.greaterThan(assigned, Resources.none())) { + if (!Resources.equals(assigned, Resources.none())) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 0a03749d326..f4b29d30b0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -161,7 +162,7 @@ public abstract class FSQueue extends Schedulable implements Queue { * @return true if check passes (can assign) or false otherwise */ protected boolean assignContainerPreCheck(FSSchedulerNode node) { - if (Resources.greaterThan(getResourceUsage(), + if (!Resources.fitsIn(getResourceUsage(), queueMgr.getMaxResources(getName())) || node.getReservedContainer() != null) { return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 7cde2fb6827..533d5c3ddbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -46,7 +46,7 @@ public class FSSchedulerNode extends SchedulerNode { private static final RecordFactory recordFactory = RecordFactoryProvider .getRecordFactory(null); - private Resource availableResource = recordFactory.newRecordInstance(Resource.class); + private Resource availableResource; private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private volatile int numContainers; @@ -62,7 +62,7 @@ public class FSSchedulerNode extends SchedulerNode { public FSSchedulerNode(RMNode node) { this.rmNode = node; - this.availableResource.setMemory(node.getTotalCapability().getMemory()); + this.availableResource = Resources.clone(node.getTotalCapability()); } public RMNode getRMNode() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index df575860afd..8aafaeb5cc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -52,6 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -111,6 +114,9 @@ public class FairScheduler implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(FairScheduler.class); + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); + // Value that container assignment methods return when a container is // reserved public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); @@ -246,8 +252,10 @@ public class FairScheduler implements ResourceScheduler { * Is a queue below its min share for the given task type? */ boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(sched.getResourceUsage(), desiredShare); + Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getMinShare(), sched.getDemand()); + return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), desiredShare); } /** @@ -255,9 +263,10 @@ public class FairScheduler implements ResourceScheduler { * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.max( + Resource desiredFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(sched.getResourceUsage(), desiredFairShare); + return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), desiredFairShare); } /** @@ -283,7 +292,8 @@ public class FairScheduler implements ResourceScheduler { for (FSLeafQueue sched : queueMgr.getLeafQueues()) { resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); } - if (Resources.greaterThan(resToPreempt, Resources.none())) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, + Resources.none())) { preemptResources(queueMgr.getLeafQueues(), resToPreempt); } } @@ -309,7 +319,8 @@ public class FairScheduler implements ResourceScheduler { // Collect running containers from over-scheduled queues List runningContainers = new ArrayList(); for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { for (AppSchedulable as : sched.getAppSchedulables()) { for (RMContainer c : as.getApp().getLiveContainers()) { runningContainers.add(c); @@ -332,7 +343,8 @@ public class FairScheduler implements ResourceScheduler { // tasks, making sure we don't kill too many from any queue for (RMContainer container : runningContainers) { FSLeafQueue sched = queues.get(container); - if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + sched.getName()); @@ -345,7 +357,8 @@ public class FairScheduler implements ResourceScheduler { toPreempt = Resources.subtract(toPreempt, container.getContainer().getResource()); - if (Resources.equals(toPreempt, Resources.none())) { + if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { break; } } @@ -369,17 +382,21 @@ public class FairScheduler implements ResourceScheduler { Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(Resources.none(), - Resources.subtract(target, sched.getResourceUsage())); + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getMinShare(), sched.getDemand()); + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(Resources.none(), - Resources.subtract(target, sched.getResourceUsage())); + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getFairShare(), sched.getDemand()); + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(resToPreempt, Resources.none())) { + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare, resDueToFairShare); + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; @@ -800,9 +817,9 @@ public class FairScheduler implements ResourceScheduler { int assignedContainers = 0; while (node.getReservedContainer() == null) { boolean assignedContainer = false; - if (Resources.greaterThan( - queueMgr.getRootQueue().assignContainer(node), - Resources.none())) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + queueMgr.getRootQueue().assignContainer(node), + Resources.none())) { assignedContainer = true; } if (!assignedContainer) { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index e4efb937e27..1365565e563 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -474,8 +475,8 @@ public class QueueManager { } queueAcls.put(queueName, acls); if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) - && Resources.lessThan(maxQueueResources.get(queueName), - minQueueResources.get(queueName))) { + && !Resources.fitsIn(minQueueResources.get(queueName), + maxQueueResources.get(queueName))) { LOG.warn(String.format("Queue %s has max resources %d less than min resources %d", queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); } @@ -504,7 +505,7 @@ public class QueueManager { if (maxQueueResource != null) { return maxQueueResource; } else { - return Resources.createResource(Integer.MAX_VALUE); + return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java deleted file mode 100644 index 1e700340b9b..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Resources.java +++ /dev/null @@ -1,150 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Records; - -@Private -@Evolving -public class Resources { - - // Java doesn't have const :( - private static final Resource NONE = new Resource() { - - @Override - public int getMemory() { - return 0; - } - - @Override - public void setMemory(int memory) { - throw new RuntimeException("NONE cannot be modified!"); - } - - @Override - public int getVirtualCores() { - return 0; - } - - @Override - public void setVirtualCores(int cores) { - throw new RuntimeException("NONE cannot be modified!"); - } - - @Override - public int compareTo(Resource o) { - int diff = 0 - o.getMemory(); - if (diff == 0) { - diff = 0 - o.getVirtualCores(); - } - return diff; - } - - }; - - public static Resource createResource(int memory) { - return createResource(memory, (memory > 0) ? 1 : 0); - } - - public static Resource createResource(int memory, int cores) { - Resource resource = Records.newRecord(Resource.class); - resource.setMemory(memory); - resource.setVirtualCores(cores); - return resource; - } - - public static Resource none() { - return NONE; - } - - public static Resource clone(Resource res) { - return createResource(res.getMemory(), res.getVirtualCores()); - } - - public static Resource addTo(Resource lhs, Resource rhs) { - lhs.setMemory(lhs.getMemory() + rhs.getMemory()); - return lhs; - } - - public static Resource add(Resource lhs, Resource rhs) { - return addTo(clone(lhs), rhs); - } - - public static Resource subtractFrom(Resource lhs, Resource rhs) { - lhs.setMemory(lhs.getMemory() - rhs.getMemory()); - return lhs; - } - - public static Resource subtract(Resource lhs, Resource rhs) { - return subtractFrom(clone(lhs), rhs); - } - - public static Resource negate(Resource resource) { - return subtract(NONE, resource); - } - - public static Resource multiplyTo(Resource lhs, int by) { - lhs.setMemory(lhs.getMemory() * by); - return lhs; - } - - public static Resource multiply(Resource lhs, int by) { - return multiplyTo(clone(lhs), by); - } - - /** - * Mutliply a resource by a {@code double}. Note that integral - * resource quantites are subject to rounding during cast. - */ - public static Resource multiply(Resource lhs, double by) { - Resource out = clone(lhs); - out.setMemory((int) (lhs.getMemory() * by)); - return out; - } - - public static boolean equals(Resource lhs, Resource rhs) { - return lhs.getMemory() == rhs.getMemory(); - } - - public static boolean lessThan(Resource lhs, Resource rhs) { - return lhs.getMemory() < rhs.getMemory(); - } - - public static boolean lessThanOrEqual(Resource lhs, Resource rhs) { - return lhs.getMemory() <= rhs.getMemory(); - } - - public static boolean greaterThan(Resource lhs, Resource rhs) { - return lhs.getMemory() > rhs.getMemory(); - } - - public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) { - return lhs.getMemory() >= rhs.getMemory(); - } - - public static Resource min(Resource lhs, Resource rhs) { - return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs; - } - - public static Resource max(Resource lhs, Resource rhs) { - return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs; - }} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 4e229315cc3..e94d03217a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -22,7 +22,8 @@ import java.util.Collection; import java.util.Comparator; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; @@ -31,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting; public class FairSharePolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "Fairshare"; + private static final DefaultResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); private FairShareComparator comparator = new FairShareComparator(); @Override @@ -59,15 +62,19 @@ public class FairSharePolicy extends SchedulingPolicy { public int compare(Schedulable s1, Schedulable s2) { double minShareRatio1, minShareRatio2; double useToWeightRatio1, useToWeightRatio2; - Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand()); - Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand()); - boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1); - boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2); + Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, + s1.getMinShare(), s1.getDemand()); + Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, + s2.getMinShare(), s2.getDemand()); + boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, + s1.getResourceUsage(), minShare1); + boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, + s2.getResourceUsage(), minShare2); Resource one = Resources.createResource(1); minShareRatio1 = (double) s1.getResourceUsage().getMemory() - / Resources.max(minShare1, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory(); minShareRatio2 = (double) s2.getResourceUsage().getMemory() - / Resources.max(minShare2, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight(); useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight(); int res = 0; @@ -161,9 +168,11 @@ public class FairSharePolicy extends SchedulingPolicy { for (Schedulable sched : schedulables) { Resources.addTo(totalDemand, sched.getDemand()); } - Resource cap = Resources.min(totalDemand, totalResources); + Resource cap = Resources.min(RESOURCE_CALCULATOR, null, totalDemand, + totalResources); double rMax = 1.0; - while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), + while (Resources.lessThan(RESOURCE_CALCULATOR, null, + resUsedWithWeightToResRatio(rMax, schedulables), cap)) { rMax *= 2.0; } @@ -172,7 +181,8 @@ public class FairSharePolicy extends SchedulingPolicy { double right = rMax; for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0; - if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), + if (Resources.lessThan(RESOURCE_CALCULATOR, null, + resUsedWithWeightToResRatio(mid, schedulables), cap)) { left = mid; } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 25766ea6222..c8fabee09ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -22,8 +22,8 @@ import java.util.Collection; import java.util.Comparator; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import com.google.common.annotations.VisibleForTesting; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java new file mode 100644 index 00000000000..f4e70f7a1f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResources.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.resource; + +import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*; +import static org.junit.Assert.*; +import org.junit.Test; + +public class TestResources { + @Test(timeout=1000) + public void testFitsIn() { + assertTrue(fitsIn(createResource(1, 1), createResource(2, 2))); + assertTrue(fitsIn(createResource(2, 2), createResource(2, 2))); + assertFalse(fitsIn(createResource(2, 2), createResource(1, 1))); + assertFalse(fitsIn(createResource(1, 2), createResource(2, 1))); + assertFalse(fitsIn(createResource(2, 1), createResource(1, 2))); + } + + @Test(timeout=1000) + public void testComponentwiseMin() { + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 1), createResource(2, 2))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(2, 2), createResource(1, 1))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 2), createResource(2, 1))); + } +}