Merge -r 1475669:1475670 from trunk to branch-2. Fixes: YARN-595. Refactor fair scheduler to use common Resources. Contributed by Sandy Ryza.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1475672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1fe9a0c9f
commit
3bed8dc7b9
|
@ -113,6 +113,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-577. Add application-progress also to ApplicationReport. (Hitesh Shah
|
YARN-577. Add application-progress also to ApplicationReport. (Hitesh Shah
|
||||||
via vinodkv)
|
via vinodkv)
|
||||||
|
|
||||||
|
YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
|
||||||
|
via tomwhite)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -208,4 +208,14 @@ public class Resources {
|
||||||
Resource lhs, Resource rhs) {
|
Resource lhs, Resource rhs) {
|
||||||
return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs;
|
return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean fitsIn(Resource smaller, Resource bigger) {
|
||||||
|
return smaller.getMemory() <= bigger.getMemory() &&
|
||||||
|
smaller.getVirtualCores() <= bigger.getVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Resource componentwiseMin(Resource lhs, Resource rhs) {
|
||||||
|
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
|
||||||
|
Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
|
|
||||||
public class FSLeafQueue extends FSQueue {
|
public class FSLeafQueue extends FSQueue {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
|
@ -126,8 +127,8 @@ public class FSLeafQueue extends FSQueue {
|
||||||
+ demand);
|
+ demand);
|
||||||
}
|
}
|
||||||
demand = Resources.add(demand, toAdd);
|
demand = Resources.add(demand, toAdd);
|
||||||
if (Resources.greaterThanOrEqual(demand, maxRes)) {
|
demand = Resources.componentwiseMin(demand, maxRes);
|
||||||
demand = maxRes;
|
if (Resources.equals(demand, maxRes)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,7 +154,7 @@ public class FSLeafQueue extends FSQueue {
|
||||||
for (AppSchedulable sched : appScheds) {
|
for (AppSchedulable sched : appScheds) {
|
||||||
if (sched.getRunnable()) {
|
if (sched.getRunnable()) {
|
||||||
assigned = sched.assignContainer(node);
|
assigned = sched.assignContainer(node);
|
||||||
if (Resources.greaterThan(assigned, Resources.none())) {
|
if (!assigned.equals(Resources.none())) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
|
|
||||||
public class FSParentQueue extends FSQueue {
|
public class FSParentQueue extends FSQueue {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
|
@ -87,8 +88,8 @@ public class FSParentQueue extends FSQueue {
|
||||||
" now " + demand);
|
" now " + demand);
|
||||||
}
|
}
|
||||||
demand = Resources.add(demand, toAdd);
|
demand = Resources.add(demand, toAdd);
|
||||||
if (Resources.greaterThanOrEqual(demand, maxRes)) {
|
demand = Resources.componentwiseMin(demand, maxRes);
|
||||||
demand = maxRes;
|
if (Resources.equals(demand, maxRes)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -135,16 +136,14 @@ public class FSParentQueue extends FSQueue {
|
||||||
Resource assigned = Resources.none();
|
Resource assigned = Resources.none();
|
||||||
|
|
||||||
// If this queue is over its limit, reject
|
// If this queue is over its limit, reject
|
||||||
if (Resources.greaterThan(getResourceUsage(),
|
if (!assignContainerPreCheck(node)) {
|
||||||
queueMgr.getMaxResources(getName()))) {
|
|
||||||
return assigned;
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
Collections.sort(childQueues, policy.getComparator());
|
Collections.sort(childQueues, policy.getComparator());
|
||||||
for (FSQueue child : childQueues) {
|
for (FSQueue child : childQueues) {
|
||||||
assigned = child.assignContainer(node);
|
assigned = child.assignContainer(node);
|
||||||
if (node.getReservedContainer() != null
|
if (!Resources.equals(assigned, Resources.none())) {
|
||||||
|| Resources.greaterThan(assigned, Resources.none())) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
|
||||||
|
@ -161,7 +162,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
* @return true if check passes (can assign) or false otherwise
|
* @return true if check passes (can assign) or false otherwise
|
||||||
*/
|
*/
|
||||||
protected boolean assignContainerPreCheck(FSSchedulerNode node) {
|
protected boolean assignContainerPreCheck(FSSchedulerNode node) {
|
||||||
if (Resources.greaterThan(getResourceUsage(),
|
if (!Resources.fitsIn(getResourceUsage(),
|
||||||
queueMgr.getMaxResources(getName()))
|
queueMgr.getMaxResources(getName()))
|
||||||
|| node.getReservedContainer() != null) {
|
|| node.getReservedContainer() != null) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
private static final RecordFactory recordFactory = RecordFactoryProvider
|
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||||
.getRecordFactory(null);
|
.getRecordFactory(null);
|
||||||
|
|
||||||
private Resource availableResource = recordFactory.newRecordInstance(Resource.class);
|
private Resource availableResource;
|
||||||
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
|
||||||
|
|
||||||
private volatile int numContainers;
|
private volatile int numContainers;
|
||||||
|
@ -62,7 +62,7 @@ public class FSSchedulerNode extends SchedulerNode {
|
||||||
|
|
||||||
public FSSchedulerNode(RMNode node) {
|
public FSSchedulerNode(RMNode node) {
|
||||||
this.rmNode = node;
|
this.rmNode = node;
|
||||||
this.availableResource.setMemory(node.getTotalCapability().getMemory());
|
this.availableResource = Resources.clone(node.getTotalCapability());
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMNode getRMNode() {
|
public RMNode getRMNode() {
|
||||||
|
|
|
@ -52,6 +52,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
@ -111,6 +114,9 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
||||||
|
|
||||||
|
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
||||||
|
new DefaultResourceCalculator();
|
||||||
|
|
||||||
// Value that container assignment methods return when a container is
|
// Value that container assignment methods return when a container is
|
||||||
// reserved
|
// reserved
|
||||||
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
|
||||||
|
@ -246,8 +252,10 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
* Is a queue below its min share for the given task type?
|
* Is a queue below its min share for the given task type?
|
||||||
*/
|
*/
|
||||||
boolean isStarvedForMinShare(FSLeafQueue sched) {
|
boolean isStarvedForMinShare(FSLeafQueue sched) {
|
||||||
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
|
Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
|
sched.getMinShare(), sched.getDemand());
|
||||||
|
return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
sched.getResourceUsage(), desiredShare);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -255,9 +263,10 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
* defined as being below half its fair share.
|
* defined as being below half its fair share.
|
||||||
*/
|
*/
|
||||||
boolean isStarvedForFairShare(FSLeafQueue sched) {
|
boolean isStarvedForFairShare(FSLeafQueue sched) {
|
||||||
Resource desiredFairShare = Resources.max(
|
Resource desiredFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
||||||
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
|
return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
sched.getResourceUsage(), desiredFairShare);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -283,7 +292,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
||||||
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
|
||||||
}
|
}
|
||||||
if (Resources.greaterThan(resToPreempt, Resources.none())) {
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt,
|
||||||
|
Resources.none())) {
|
||||||
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
|
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -309,7 +319,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
// Collect running containers from over-scheduled queues
|
// Collect running containers from over-scheduled queues
|
||||||
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
|
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
|
||||||
for (FSLeafQueue sched : scheds) {
|
for (FSLeafQueue sched : scheds) {
|
||||||
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
sched.getResourceUsage(), sched.getFairShare())) {
|
||||||
for (AppSchedulable as : sched.getAppSchedulables()) {
|
for (AppSchedulable as : sched.getAppSchedulables()) {
|
||||||
for (RMContainer c : as.getApp().getLiveContainers()) {
|
for (RMContainer c : as.getApp().getLiveContainers()) {
|
||||||
runningContainers.add(c);
|
runningContainers.add(c);
|
||||||
|
@ -332,7 +343,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
// tasks, making sure we don't kill too many from any queue
|
// tasks, making sure we don't kill too many from any queue
|
||||||
for (RMContainer container : runningContainers) {
|
for (RMContainer container : runningContainers) {
|
||||||
FSLeafQueue sched = queues.get(container);
|
FSLeafQueue sched = queues.get(container);
|
||||||
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
sched.getResourceUsage(), sched.getFairShare())) {
|
||||||
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
|
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
|
||||||
"res=" + container.getContainer().getResource() +
|
"res=" + container.getContainer().getResource() +
|
||||||
") from queue " + sched.getName());
|
") from queue " + sched.getName());
|
||||||
|
@ -345,7 +357,8 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
|
|
||||||
toPreempt = Resources.subtract(toPreempt,
|
toPreempt = Resources.subtract(toPreempt,
|
||||||
container.getContainer().getResource());
|
container.getContainer().getResource());
|
||||||
if (Resources.equals(toPreempt, Resources.none())) {
|
if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
toPreempt, Resources.none())) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -369,17 +382,21 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
Resource resDueToMinShare = Resources.none();
|
Resource resDueToMinShare = Resources.none();
|
||||||
Resource resDueToFairShare = Resources.none();
|
Resource resDueToFairShare = Resources.none();
|
||||||
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
||||||
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
|
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
resDueToMinShare = Resources.max(Resources.none(),
|
sched.getMinShare(), sched.getDemand());
|
||||||
Resources.subtract(target, sched.getResourceUsage()));
|
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
||||||
}
|
}
|
||||||
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
|
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
|
||||||
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
|
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
resDueToFairShare = Resources.max(Resources.none(),
|
sched.getFairShare(), sched.getDemand());
|
||||||
Resources.subtract(target, sched.getResourceUsage()));
|
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
||||||
}
|
}
|
||||||
Resource resToPreempt = Resources.max(resDueToMinShare, resDueToFairShare);
|
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
if (Resources.greaterThan(resToPreempt, Resources.none())) {
|
resDueToMinShare, resDueToFairShare);
|
||||||
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
|
resToPreempt, Resources.none())) {
|
||||||
String message = "Should preempt " + resToPreempt + " res for queue "
|
String message = "Should preempt " + resToPreempt + " res for queue "
|
||||||
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
||||||
+ ", resDueToFairShare = " + resDueToFairShare;
|
+ ", resDueToFairShare = " + resDueToFairShare;
|
||||||
|
@ -800,7 +817,7 @@ public class FairScheduler implements ResourceScheduler {
|
||||||
int assignedContainers = 0;
|
int assignedContainers = 0;
|
||||||
while (node.getReservedContainer() == null) {
|
while (node.getReservedContainer() == null) {
|
||||||
boolean assignedContainer = false;
|
boolean assignedContainer = false;
|
||||||
if (Resources.greaterThan(
|
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
|
||||||
queueMgr.getRootQueue().assignContainer(node),
|
queueMgr.getRootQueue().assignContainer(node),
|
||||||
Resources.none())) {
|
Resources.none())) {
|
||||||
assignedContainer = true;
|
assignedContainer = true;
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.w3c.dom.Document;
|
import org.w3c.dom.Document;
|
||||||
import org.w3c.dom.Element;
|
import org.w3c.dom.Element;
|
||||||
import org.w3c.dom.Node;
|
import org.w3c.dom.Node;
|
||||||
|
@ -474,8 +475,8 @@ public class QueueManager {
|
||||||
}
|
}
|
||||||
queueAcls.put(queueName, acls);
|
queueAcls.put(queueName, acls);
|
||||||
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
|
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
|
||||||
&& Resources.lessThan(maxQueueResources.get(queueName),
|
&& !Resources.fitsIn(minQueueResources.get(queueName),
|
||||||
minQueueResources.get(queueName))) {
|
maxQueueResources.get(queueName))) {
|
||||||
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
|
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
|
||||||
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
|
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
|
||||||
}
|
}
|
||||||
|
@ -504,7 +505,7 @@ public class QueueManager {
|
||||||
if (maxQueueResource != null) {
|
if (maxQueueResource != null) {
|
||||||
return maxQueueResource;
|
return maxQueueResource;
|
||||||
} else {
|
} else {
|
||||||
return Resources.createResource(Integer.MAX_VALUE);
|
return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,150 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
|
|
||||||
@Private
|
|
||||||
@Evolving
|
|
||||||
public class Resources {
|
|
||||||
|
|
||||||
// Java doesn't have const :(
|
|
||||||
private static final Resource NONE = new Resource() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMemory() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMemory(int memory) {
|
|
||||||
throw new RuntimeException("NONE cannot be modified!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getVirtualCores() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setVirtualCores(int cores) {
|
|
||||||
throw new RuntimeException("NONE cannot be modified!");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Resource o) {
|
|
||||||
int diff = 0 - o.getMemory();
|
|
||||||
if (diff == 0) {
|
|
||||||
diff = 0 - o.getVirtualCores();
|
|
||||||
}
|
|
||||||
return diff;
|
|
||||||
}
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
public static Resource createResource(int memory) {
|
|
||||||
return createResource(memory, (memory > 0) ? 1 : 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource createResource(int memory, int cores) {
|
|
||||||
Resource resource = Records.newRecord(Resource.class);
|
|
||||||
resource.setMemory(memory);
|
|
||||||
resource.setVirtualCores(cores);
|
|
||||||
return resource;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource none() {
|
|
||||||
return NONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource clone(Resource res) {
|
|
||||||
return createResource(res.getMemory(), res.getVirtualCores());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource addTo(Resource lhs, Resource rhs) {
|
|
||||||
lhs.setMemory(lhs.getMemory() + rhs.getMemory());
|
|
||||||
return lhs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource add(Resource lhs, Resource rhs) {
|
|
||||||
return addTo(clone(lhs), rhs);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource subtractFrom(Resource lhs, Resource rhs) {
|
|
||||||
lhs.setMemory(lhs.getMemory() - rhs.getMemory());
|
|
||||||
return lhs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource subtract(Resource lhs, Resource rhs) {
|
|
||||||
return subtractFrom(clone(lhs), rhs);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource negate(Resource resource) {
|
|
||||||
return subtract(NONE, resource);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource multiplyTo(Resource lhs, int by) {
|
|
||||||
lhs.setMemory(lhs.getMemory() * by);
|
|
||||||
return lhs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource multiply(Resource lhs, int by) {
|
|
||||||
return multiplyTo(clone(lhs), by);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mutliply a resource by a {@code double}. Note that integral
|
|
||||||
* resource quantites are subject to rounding during cast.
|
|
||||||
*/
|
|
||||||
public static Resource multiply(Resource lhs, double by) {
|
|
||||||
Resource out = clone(lhs);
|
|
||||||
out.setMemory((int) (lhs.getMemory() * by));
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean equals(Resource lhs, Resource rhs) {
|
|
||||||
return lhs.getMemory() == rhs.getMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean lessThan(Resource lhs, Resource rhs) {
|
|
||||||
return lhs.getMemory() < rhs.getMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean lessThanOrEqual(Resource lhs, Resource rhs) {
|
|
||||||
return lhs.getMemory() <= rhs.getMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean greaterThan(Resource lhs, Resource rhs) {
|
|
||||||
return lhs.getMemory() > rhs.getMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
|
|
||||||
return lhs.getMemory() >= rhs.getMemory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource min(Resource lhs, Resource rhs) {
|
|
||||||
return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource max(Resource lhs, Resource rhs) {
|
|
||||||
return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
|
|
||||||
}}
|
|
|
@ -22,7 +22,8 @@ import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||||
|
|
||||||
|
@ -31,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
public class FairSharePolicy extends SchedulingPolicy {
|
public class FairSharePolicy extends SchedulingPolicy {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final String NAME = "Fairshare";
|
public static final String NAME = "Fairshare";
|
||||||
|
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
||||||
|
new DefaultResourceCalculator();
|
||||||
private FairShareComparator comparator = new FairShareComparator();
|
private FairShareComparator comparator = new FairShareComparator();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -59,15 +62,19 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
public int compare(Schedulable s1, Schedulable s2) {
|
public int compare(Schedulable s1, Schedulable s2) {
|
||||||
double minShareRatio1, minShareRatio2;
|
double minShareRatio1, minShareRatio2;
|
||||||
double useToWeightRatio1, useToWeightRatio2;
|
double useToWeightRatio1, useToWeightRatio2;
|
||||||
Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
|
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||||
Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
|
s1.getMinShare(), s1.getDemand());
|
||||||
boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
|
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||||
boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
|
s2.getMinShare(), s2.getDemand());
|
||||||
|
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||||
|
s1.getResourceUsage(), minShare1);
|
||||||
|
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||||
|
s2.getResourceUsage(), minShare2);
|
||||||
Resource one = Resources.createResource(1);
|
Resource one = Resources.createResource(1);
|
||||||
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
|
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
|
||||||
/ Resources.max(minShare1, one).getMemory();
|
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
|
||||||
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
|
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
|
||||||
/ Resources.max(minShare2, one).getMemory();
|
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
|
||||||
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
|
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
|
||||||
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
|
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
|
||||||
int res = 0;
|
int res = 0;
|
||||||
|
@ -161,9 +168,11 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
Resources.addTo(totalDemand, sched.getDemand());
|
Resources.addTo(totalDemand, sched.getDemand());
|
||||||
}
|
}
|
||||||
Resource cap = Resources.min(totalDemand, totalResources);
|
Resource cap = Resources.min(RESOURCE_CALCULATOR, null, totalDemand,
|
||||||
|
totalResources);
|
||||||
double rMax = 1.0;
|
double rMax = 1.0;
|
||||||
while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables),
|
while (Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||||
|
resUsedWithWeightToResRatio(rMax, schedulables),
|
||||||
cap)) {
|
cap)) {
|
||||||
rMax *= 2.0;
|
rMax *= 2.0;
|
||||||
}
|
}
|
||||||
|
@ -172,7 +181,8 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
double right = rMax;
|
double right = rMax;
|
||||||
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
||||||
double mid = (left + right) / 2.0;
|
double mid = (left + right) / 2.0;
|
||||||
if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables),
|
if (Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||||
|
resUsedWithWeightToResRatio(mid, schedulables),
|
||||||
cap)) {
|
cap)) {
|
||||||
left = mid;
|
left = mid;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -22,8 +22,8 @@ import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestResources {
|
||||||
|
@Test(timeout=1000)
|
||||||
|
public void testFitsIn() {
|
||||||
|
assertTrue(fitsIn(createResource(1, 1), createResource(2, 2)));
|
||||||
|
assertTrue(fitsIn(createResource(2, 2), createResource(2, 2)));
|
||||||
|
assertFalse(fitsIn(createResource(2, 2), createResource(1, 1)));
|
||||||
|
assertFalse(fitsIn(createResource(1, 2), createResource(2, 1)));
|
||||||
|
assertFalse(fitsIn(createResource(2, 1), createResource(1, 2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=1000)
|
||||||
|
public void testComponentwiseMin() {
|
||||||
|
assertEquals(createResource(1, 1),
|
||||||
|
componentwiseMin(createResource(1, 1), createResource(2, 2)));
|
||||||
|
assertEquals(createResource(1, 1),
|
||||||
|
componentwiseMin(createResource(2, 2), createResource(1, 1)));
|
||||||
|
assertEquals(createResource(1, 1),
|
||||||
|
componentwiseMin(createResource(1, 2), createResource(2, 1)));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue