YARN-6612. Update fair scheduler policies to be aware of resource types. (Contributed by Daniel Templeton via Yufei Gu)
This commit is contained in:
parent
65a941008d
commit
09b476e6da
|
@ -344,11 +344,10 @@ public class ResourceUtils {
|
|||
addResourcesFileToConf(resourceFile, conf);
|
||||
LOG.debug("Found " + resourceFile + ", adding to configuration");
|
||||
} catch (FileNotFoundException fe) {
|
||||
LOG.info("Unable to find '" + resourceFile
|
||||
+ "'. Falling back to memory and vcores as resources.");
|
||||
LOG.debug("Unable to find '" + resourceFile + "'.");
|
||||
}
|
||||
initializeResourcesMap(conf);
|
||||
|
||||
initializeResourcesMap(conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,72 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public class ResourceWeights {
|
||||
public static final ResourceWeights NEUTRAL = new ResourceWeights(1.0f);
|
||||
|
||||
private final float[] weights = new float[ResourceType.values().length];
|
||||
|
||||
public ResourceWeights(float memoryWeight, float cpuWeight) {
|
||||
weights[ResourceType.MEMORY.ordinal()] = memoryWeight;
|
||||
weights[ResourceType.CPU.ordinal()] = cpuWeight;
|
||||
}
|
||||
|
||||
public ResourceWeights(float weight) {
|
||||
setWeight(weight);
|
||||
}
|
||||
|
||||
public ResourceWeights() { }
|
||||
|
||||
public final void setWeight(float weight) {
|
||||
for (int i = 0; i < weights.length; i++) {
|
||||
weights[i] = weight;
|
||||
}
|
||||
}
|
||||
|
||||
public void setWeight(ResourceType resourceType, float weight) {
|
||||
weights[resourceType.ordinal()] = weight;
|
||||
}
|
||||
|
||||
public float getWeight(ResourceType resourceType) {
|
||||
return weights[resourceType.ordinal()];
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("<");
|
||||
for (int i = 0; i < ResourceType.values().length; i++) {
|
||||
if (i != 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
ResourceType resourceType = ResourceType.values()[i];
|
||||
sb.append(StringUtils.toLowerCase(resourceType.name()));
|
||||
sb.append(StringUtils.format(" weight=%.1f", getWeight(resourceType)));
|
||||
}
|
||||
sb.append(">");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ReservationACL;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -51,7 +50,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
// Maximum amount of resources for each queue's ad hoc children
|
||||
private final Map<String, Resource> maxChildQueueResources;
|
||||
// Sharing weights for each queue
|
||||
private final Map<String, ResourceWeights> queueWeights;
|
||||
private final Map<String, Float> queueWeights;
|
||||
|
||||
// Max concurrent running applications for each queue and for each user; in addition,
|
||||
// for users that have no max specified, we use the userMaxJobsDefault.
|
||||
|
@ -112,10 +111,12 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Resource> maxChildQueueResources,
|
||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||
Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, Integer> queueMaxApps,
|
||||
Map<String, Integer> userMaxApps,
|
||||
Map<String, Float> queueWeights,
|
||||
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
|
||||
int queueMaxAppsDefault, Resource queueMaxResourcesDefault,
|
||||
int queueMaxAppsDefault,
|
||||
Resource queueMaxResourcesDefault,
|
||||
float queueMaxAMShareDefault,
|
||||
Map<String, SchedulingPolicy> schedulingPolicies,
|
||||
SchedulingPolicy defaultSchedulingPolicy,
|
||||
|
@ -253,9 +254,9 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
return !nonPreemptableQueues.contains(queueName);
|
||||
}
|
||||
|
||||
private ResourceWeights getQueueWeight(String queue) {
|
||||
ResourceWeights weight = queueWeights.get(queue);
|
||||
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
|
||||
private float getQueueWeight(String queue) {
|
||||
Float weight = queueWeights.get(queue);
|
||||
return (weight == null) ? 1.0f : weight;
|
||||
}
|
||||
|
||||
public int getUserMaxApps(String user) {
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.security.AccessType;
|
|||
import org.apache.hadoop.yarn.security.Permission;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
@ -232,7 +231,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Map<String, Integer> queueMaxApps = new HashMap<>();
|
||||
Map<String, Integer> userMaxApps = new HashMap<>();
|
||||
Map<String, Float> queueMaxAMShares = new HashMap<>();
|
||||
Map<String, ResourceWeights> queueWeights = new HashMap<>();
|
||||
Map<String, Float> queueWeights = new HashMap<>();
|
||||
Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
|
||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
|
||||
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
|
||||
|
@ -454,7 +453,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Map<String, Integer> queueMaxApps,
|
||||
Map<String, Integer> userMaxApps,
|
||||
Map<String, Float> queueMaxAMShares,
|
||||
Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, Float> queueWeights,
|
||||
Map<String, SchedulingPolicy> queuePolicies,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Long> fairSharePreemptionTimeouts,
|
||||
|
@ -522,7 +521,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
} else if ("weight".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
double val = Double.parseDouble(text);
|
||||
queueWeights.put(queueName, new ResourceWeights((float)val));
|
||||
queueWeights.put(queueName, (float)val);
|
||||
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
long val = Long.parseLong(text) * 1000L;
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
|
@ -75,7 +74,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|||
|
||||
private final long startTime;
|
||||
private final Priority appPriority;
|
||||
private final ResourceWeights resourceWeights;
|
||||
private Resource demand = Resources.createResource(0);
|
||||
private final FairScheduler scheduler;
|
||||
private Resource fairShare = Resources.createResource(0, 0);
|
||||
|
@ -120,11 +118,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|||
this.startTime = scheduler.getClock().getTime();
|
||||
this.lastTimeAtFairShare = this.startTime;
|
||||
this.appPriority = Priority.newInstance(1);
|
||||
this.resourceWeights = new ResourceWeights();
|
||||
}
|
||||
|
||||
ResourceWeights getResourceWeights() {
|
||||
return resourceWeights;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1281,7 +1274,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResourceWeights getWeights() {
|
||||
public float getWeight() {
|
||||
return scheduler.getAppWeight(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
|
@ -553,7 +552,7 @@ public class FSLeafQueue extends FSQueue {
|
|||
* @param weight queue weight
|
||||
*/
|
||||
public void setWeights(float weight) {
|
||||
this.weights = new ResourceWeights(weight);
|
||||
this.weights = weight;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.security.AccessRequest;
|
|||
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -70,7 +69,7 @@ public abstract class FSQueue implements Queue, Schedulable {
|
|||
|
||||
protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
|
||||
|
||||
protected ResourceWeights weights;
|
||||
protected float weights;
|
||||
protected Resource minShare;
|
||||
protected Resource maxShare;
|
||||
protected int maxRunningApps;
|
||||
|
@ -140,12 +139,12 @@ public abstract class FSQueue implements Queue, Schedulable {
|
|||
this.policy = policy;
|
||||
}
|
||||
|
||||
public void setWeights(ResourceWeights weights){
|
||||
public void setWeights(float weights) {
|
||||
this.weights = weights;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceWeights getWeights() {
|
||||
public float getWeight() {
|
||||
return weights;
|
||||
}
|
||||
|
||||
|
@ -439,7 +438,7 @@ public abstract class FSQueue implements Queue, Schedulable {
|
|||
@Override
|
||||
public String toString() {
|
||||
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
|
||||
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
|
||||
getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
|
@ -369,7 +368,7 @@ public class FairScheduler extends
|
|||
return rmContext.getContainerTokenSecretManager();
|
||||
}
|
||||
|
||||
public ResourceWeights getAppWeight(FSAppAttempt app) {
|
||||
public float getAppWeight(FSAppAttempt app) {
|
||||
try {
|
||||
readLock.lock();
|
||||
double weight = 1.0;
|
||||
|
@ -377,14 +376,10 @@ public class FairScheduler extends
|
|||
// Set weight based on current memory demand
|
||||
weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
|
||||
}
|
||||
weight *= app.getPriority().getPriority();
|
||||
ResourceWeights resourceWeights = app.getResourceWeights();
|
||||
resourceWeights.setWeight((float) weight);
|
||||
return resourceWeights;
|
||||
return (float)weight * app.getPriority().getPriority();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Resource getIncrementResourceCapability() {
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
|
||||
/**
|
||||
* A Schedulable represents an entity that can be scheduled such as an
|
||||
|
@ -72,8 +71,15 @@ public interface Schedulable {
|
|||
/** Maximum Resource share assigned to the schedulable. */
|
||||
Resource getMaxShare();
|
||||
|
||||
/** Job/queue weight in fair sharing. */
|
||||
ResourceWeights getWeights();
|
||||
/**
|
||||
* Job/queue weight in fair sharing. Weights are only meaningful when
|
||||
* compared. A weight of 2.0f has twice the weight of a weight of 1.0f,
|
||||
* which has twice the weight of a weight of 0.5f. A weight of 1.0f is
|
||||
* considered unweighted or a neutral weight. A weight of 0 is no weight.
|
||||
*
|
||||
* @return the weight
|
||||
*/
|
||||
float getWeight();
|
||||
|
||||
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
|
||||
long getStartTime();
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
|
||||
|
@ -47,7 +46,7 @@ public class ComputeFairShares {
|
|||
*/
|
||||
public static void computeShares(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources,
|
||||
ResourceType type) {
|
||||
String type) {
|
||||
computeSharesInternal(schedulables, totalResources, type, false);
|
||||
}
|
||||
|
||||
|
@ -62,7 +61,7 @@ public class ComputeFairShares {
|
|||
*/
|
||||
public static void computeSteadyShares(
|
||||
Collection<? extends FSQueue> queues, Resource totalResources,
|
||||
ResourceType type) {
|
||||
String type) {
|
||||
computeSharesInternal(queues, totalResources, type, true);
|
||||
}
|
||||
|
||||
|
@ -110,9 +109,9 @@ public class ComputeFairShares {
|
|||
*/
|
||||
private static void computeSharesInternal(
|
||||
Collection<? extends Schedulable> allSchedulables,
|
||||
Resource totalResources, ResourceType type, boolean isSteadyShare) {
|
||||
Resource totalResources, String type, boolean isSteadyShare) {
|
||||
|
||||
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
|
||||
Collection<Schedulable> schedulables = new ArrayList<>();
|
||||
int takenResources = handleFixedFairShares(
|
||||
allSchedulables, schedulables, isSteadyShare, type);
|
||||
|
||||
|
@ -124,7 +123,7 @@ public class ComputeFairShares {
|
|||
// have met all Schedulables' max shares.
|
||||
int totalMaxShare = 0;
|
||||
for (Schedulable sched : schedulables) {
|
||||
long maxShare = getResourceValue(sched.getMaxShare(), type);
|
||||
long maxShare = sched.getMaxShare().getResourceValue(type);
|
||||
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
|
||||
Integer.MAX_VALUE);
|
||||
if (totalMaxShare == Integer.MAX_VALUE) {
|
||||
|
@ -132,7 +131,7 @@ public class ComputeFairShares {
|
|||
}
|
||||
}
|
||||
|
||||
long totalResource = Math.max((getResourceValue(totalResources, type) -
|
||||
long totalResource = Math.max((totalResources.getResourceValue(type) -
|
||||
takenResources), 0);
|
||||
totalResource = Math.min(totalMaxShare, totalResource);
|
||||
|
||||
|
@ -159,13 +158,15 @@ public class ComputeFairShares {
|
|||
}
|
||||
// Set the fair shares based on the value of R we've converged to
|
||||
for (Schedulable sched : schedulables) {
|
||||
Resource target;
|
||||
|
||||
if (isSteadyShare) {
|
||||
setResourceValue(computeShare(sched, right, type),
|
||||
((FSQueue) sched).getSteadyFairShare(), type);
|
||||
target = ((FSQueue) sched).getSteadyFairShare();
|
||||
} else {
|
||||
setResourceValue(
|
||||
computeShare(sched, right, type), sched.getFairShare(), type);
|
||||
target = sched.getFairShare();
|
||||
}
|
||||
|
||||
target.setResourceValue(type, (long)computeShare(sched, right, type));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,7 +175,7 @@ public class ComputeFairShares {
|
|||
* w2rRatio, for use in the computeFairShares algorithm as described in #
|
||||
*/
|
||||
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
||||
Collection<? extends Schedulable> schedulables, ResourceType type) {
|
||||
Collection<? extends Schedulable> schedulables, String type) {
|
||||
int resourcesTaken = 0;
|
||||
for (Schedulable sched : schedulables) {
|
||||
int share = computeShare(sched, w2rRatio, type);
|
||||
|
@ -188,10 +189,10 @@ public class ComputeFairShares {
|
|||
* weight-to-resource ratio w2rRatio.
|
||||
*/
|
||||
private static int computeShare(Schedulable sched, double w2rRatio,
|
||||
ResourceType type) {
|
||||
double share = sched.getWeights().getWeight(type) * w2rRatio;
|
||||
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
|
||||
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
|
||||
String type) {
|
||||
double share = sched.getWeight() * w2rRatio;
|
||||
share = Math.max(share, sched.getMinShare().getResourceValue(type));
|
||||
share = Math.min(share, sched.getMaxShare().getResourceValue(type));
|
||||
return (int) share;
|
||||
}
|
||||
|
||||
|
@ -203,7 +204,7 @@ public class ComputeFairShares {
|
|||
private static int handleFixedFairShares(
|
||||
Collection<? extends Schedulable> schedulables,
|
||||
Collection<Schedulable> nonFixedSchedulables,
|
||||
boolean isSteadyShare, ResourceType type) {
|
||||
boolean isSteadyShare, String type) {
|
||||
int totalResource = 0;
|
||||
|
||||
for (Schedulable sched : schedulables) {
|
||||
|
@ -211,11 +212,15 @@ public class ComputeFairShares {
|
|||
if (fixedShare < 0) {
|
||||
nonFixedSchedulables.add(sched);
|
||||
} else {
|
||||
setResourceValue(fixedShare,
|
||||
isSteadyShare
|
||||
? ((FSQueue)sched).getSteadyFairShare()
|
||||
: sched.getFairShare(),
|
||||
type);
|
||||
Resource target;
|
||||
|
||||
if (isSteadyShare) {
|
||||
target = ((FSQueue)sched).getSteadyFairShare();
|
||||
} else {
|
||||
target = sched.getFairShare();
|
||||
}
|
||||
|
||||
target.setResourceValue(type, fixedShare);
|
||||
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
|
||||
Integer.MAX_VALUE);
|
||||
}
|
||||
|
@ -230,10 +235,10 @@ public class ComputeFairShares {
|
|||
* or the Schedulable is not active for instantaneous fairshare.
|
||||
*/
|
||||
private static long getFairShareIfFixed(Schedulable sched,
|
||||
boolean isSteadyShare, ResourceType type) {
|
||||
boolean isSteadyShare, String type) {
|
||||
|
||||
// Check if maxShare is 0
|
||||
if (getResourceValue(sched.getMaxShare(), type) <= 0) {
|
||||
if (sched.getMaxShare().getResourceValue(type) <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -244,35 +249,11 @@ public class ComputeFairShares {
|
|||
}
|
||||
|
||||
// Check if weight is 0
|
||||
if (sched.getWeights().getWeight(type) <= 0) {
|
||||
long minShare = getResourceValue(sched.getMinShare(), type);
|
||||
if (sched.getWeight() <= 0) {
|
||||
long minShare = sched.getMinShare().getResourceValue(type);
|
||||
return (minShare <= 0) ? 0 : minShare;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
private static long getResourceValue(Resource resource, ResourceType type) {
|
||||
switch (type) {
|
||||
case MEMORY:
|
||||
return resource.getMemorySize();
|
||||
case CPU:
|
||||
return resource.getVirtualCores();
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid resource");
|
||||
}
|
||||
}
|
||||
|
||||
private static void setResourceValue(long val, Resource resource, ResourceType type) {
|
||||
switch (type) {
|
||||
case MEMORY:
|
||||
resource.setMemorySize(val);
|
||||
break;
|
||||
case CPU:
|
||||
resource.setVirtualCores((int)val);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid resource");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
||||
|
@ -25,18 +26,15 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
/**
|
||||
* Makes scheduling decisions by trying to equalize dominant resource usage.
|
||||
|
@ -72,16 +70,18 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
@Override
|
||||
public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
Resource totalResources) {
|
||||
for (ResourceType type : ResourceType.values()) {
|
||||
ComputeFairShares.computeShares(schedulables, totalResources, type);
|
||||
for (ResourceInformation info: ResourceUtils.getResourceTypesArray()) {
|
||||
ComputeFairShares.computeShares(schedulables, totalResources,
|
||||
info.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeSteadyShares(Collection<? extends FSQueue> queues,
|
||||
Resource totalResources) {
|
||||
for (ResourceType type : ResourceType.values()) {
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources, type);
|
||||
for (ResourceInformation info: ResourceUtils.getResourceTypesArray()) {
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources,
|
||||
info.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,9 +110,13 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
COMPARATOR.setFSContext(fsContext);
|
||||
}
|
||||
|
||||
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
|
||||
private static final int NUM_RESOURCES = ResourceType.values().length;
|
||||
|
||||
/**
|
||||
* This class compares two {@link Schedulable} instances according to the
|
||||
* DRF policy. If neither instance is below min share, approximate fair share
|
||||
* ratios are compared.
|
||||
*/
|
||||
public static class DominantResourceFairnessComparator
|
||||
implements Comparator<Schedulable> {
|
||||
private FSContext fsContext;
|
||||
|
||||
public void setFSContext(FSContext fsContext) {
|
||||
|
@ -121,89 +125,199 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public int compare(Schedulable s1, Schedulable s2) {
|
||||
ResourceWeights sharesOfCluster1 = new ResourceWeights();
|
||||
ResourceWeights sharesOfCluster2 = new ResourceWeights();
|
||||
ResourceWeights sharesOfMinShare1 = new ResourceWeights();
|
||||
ResourceWeights sharesOfMinShare2 = new ResourceWeights();
|
||||
ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES];
|
||||
ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES];
|
||||
ResourceInformation[] info = ResourceUtils.getResourceTypesArray();
|
||||
Resource usage1 = s1.getResourceUsage();
|
||||
Resource usage2 = s2.getResourceUsage();
|
||||
Resource minShare1 = s1.getMinShare();
|
||||
Resource minShare2 = s2.getMinShare();
|
||||
Resource clusterCapacity = fsContext.getClusterResource();
|
||||
|
||||
// Calculate shares of the cluster for each resource both schedulables.
|
||||
calculateShares(s1.getResourceUsage(),
|
||||
clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights());
|
||||
calculateShares(s1.getResourceUsage(),
|
||||
s1.getMinShare(), sharesOfMinShare1, null, ResourceWeights.NEUTRAL);
|
||||
calculateShares(s2.getResourceUsage(),
|
||||
clusterCapacity, sharesOfCluster2, resourceOrder2, s2.getWeights());
|
||||
calculateShares(s2.getResourceUsage(),
|
||||
s2.getMinShare(), sharesOfMinShare2, null, ResourceWeights.NEUTRAL);
|
||||
|
||||
// These arrays hold the usage, fair, and min share ratios for each
|
||||
// resource type. ratios[0][x] are the usage ratios, ratios[1][x] are
|
||||
// the fair share ratios, and ratios[2][x] are the min share ratios.
|
||||
float[][] ratios1 = new float[info.length][3];
|
||||
float[][] ratios2 = new float[info.length][3];
|
||||
|
||||
// Calculate cluster shares and approximate fair shares for each
|
||||
// resource type of both schedulables.
|
||||
int dominant1 = calculateClusterAndFairRatios(usage1, clusterCapacity,
|
||||
ratios1, s1.getWeight());
|
||||
int dominant2 = calculateClusterAndFairRatios(usage2, clusterCapacity,
|
||||
ratios2, s2.getWeight());
|
||||
|
||||
// A queue is needy for its min share if its dominant resource
|
||||
// (with respect to the cluster capacity) is below its configured min share
|
||||
// for that resource
|
||||
boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f;
|
||||
boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f;
|
||||
// (with respect to the cluster capacity) is below its configured min
|
||||
// share for that resource
|
||||
boolean s1Needy =
|
||||
usage1.getResources()[dominant1].getValue() <
|
||||
minShare1.getResources()[dominant1].getValue();
|
||||
boolean s2Needy =
|
||||
usage2.getResources()[dominant2].getValue() <
|
||||
minShare2.getResources()[dominant2].getValue();
|
||||
|
||||
int res = 0;
|
||||
|
||||
if (!s2Needy && !s1Needy) {
|
||||
res = compareShares(sharesOfCluster1, sharesOfCluster2,
|
||||
resourceOrder1, resourceOrder2);
|
||||
// Sort shares by usage ratio and compare them by approximate fair share
|
||||
// ratio
|
||||
sortRatios(ratios1, ratios2);
|
||||
res = compareRatios(ratios1, ratios2, 1);
|
||||
} else if (s1Needy && !s2Needy) {
|
||||
res = -1;
|
||||
} else if (s2Needy && !s1Needy) {
|
||||
res = 1;
|
||||
} else { // both are needy below min share
|
||||
res = compareShares(sharesOfMinShare1, sharesOfMinShare2,
|
||||
resourceOrder1, resourceOrder2);
|
||||
// Calculate the min share ratios, then sort by usage ratio, and compare
|
||||
// by min share ratio
|
||||
calculateMinShareRatios(usage1, minShare1, ratios1);
|
||||
calculateMinShareRatios(usage2, minShare2, ratios2);
|
||||
sortRatios(ratios1, ratios2);
|
||||
res = compareRatios(ratios1, ratios2, 2);
|
||||
}
|
||||
|
||||
if (res == 0) {
|
||||
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
||||
// name to get a deterministic ordering, which is useful for unit tests.
|
||||
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
||||
|
||||
if (res == 0) {
|
||||
res = s1.getName().compareTo(s2.getName());
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort both ratios arrays according to the usage ratios (the
|
||||
* first index of the inner arrays, e.g. {@code ratios1[x][0]}).
|
||||
*
|
||||
* @param ratios1 the first ratios array
|
||||
* @param ratios2 the second ratios array
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void sortRatios(float[][] ratios1, float[][]ratios2) {
|
||||
// sort order descending by resource share
|
||||
Arrays.sort(ratios1, (float[] o1, float[] o2) ->
|
||||
(int) Math.signum(o2[0] - o1[0]));
|
||||
Arrays.sort(ratios2, (float[] o1, float[] o2) ->
|
||||
(int) Math.signum(o2[0] - o1[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate a resource's usage ratio and approximate fair share ratio.
|
||||
* The {@code shares} array will be populated with both the usage ratio
|
||||
* and the approximate fair share ratio for each resource type. The usage
|
||||
* ratio is calculated as {@code resource} divided by {@code cluster}.
|
||||
* The approximate fair share ratio is calculated as the usage ratio
|
||||
* divided by {@code weight}. If the cluster's resources are 100MB and
|
||||
* 10 vcores, and the usage ({@code resource}) is 10 MB and 5 CPU, the
|
||||
* usage ratios will be 0.1 and 0.5. If the weights are 2, the fair
|
||||
* share ratios will be 0.05 and 0.25.
|
||||
*
|
||||
* The approximate fair share ratio is the usage divided by the
|
||||
* approximate fair share, i.e. the cluster resources times the weight.
|
||||
* The approximate fair share is an acceptable proxy for the fair share
|
||||
* because when comparing resources, the resource with the higher weight
|
||||
* will be assigned by the scheduler a proportionally higher fair share.
|
||||
*
|
||||
* The {@code shares} array must be at least <i>n</i> x 2, where <i>n</i>
|
||||
* is the number of resource types. Only the first and second indices of
|
||||
* the inner arrays in the {@code shares} array will be used, e.g.
|
||||
* {@code shares[x][0]} and {@code shares[x][1]}.
|
||||
*
|
||||
* The return value will be the index of the dominant resource type in the
|
||||
* {@code shares} array. The dominant resource is the resource type for
|
||||
* which {@code resource} has the largest usage ratio.
|
||||
*
|
||||
* @param resource the resource for which to calculate ratios
|
||||
* @param cluster the total cluster resources
|
||||
* @param ratios the shares array to populate
|
||||
* @param weight the resource weight
|
||||
* @return the index of the resource type with the largest cluster share
|
||||
*/
|
||||
@VisibleForTesting
|
||||
int calculateClusterAndFairRatios(Resource resource, Resource cluster,
|
||||
float[][] ratios, float weight) {
|
||||
ResourceInformation[] resourceInfo = resource.getResources();
|
||||
ResourceInformation[] clusterInfo = cluster.getResources();
|
||||
int max = 0;
|
||||
|
||||
for (int i = 0; i < clusterInfo.length; i++) {
|
||||
// First calculate the cluster share
|
||||
ratios[i][0] =
|
||||
resourceInfo[i].getValue() / (float) clusterInfo[i].getValue();
|
||||
|
||||
// Use the cluster share to find the dominant resource
|
||||
if (ratios[i][0] > ratios[max][0]) {
|
||||
max = i;
|
||||
}
|
||||
|
||||
// Now divide by the weight to get the approximate fair share.
|
||||
// It's OK if the weight is zero, because the floating point division
|
||||
// will yield Infinity, i.e. this Schedulable will lose out to any
|
||||
// other Schedulable with non-zero weight.
|
||||
ratios[i][1] = ratios[i][0] / weight;
|
||||
}
|
||||
|
||||
return max;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates and orders a resource's share of a pool in terms of two vectors.
|
||||
* The shares vector contains, for each resource, the fraction of the pool that
|
||||
* it takes up. The resourceOrder vector contains an ordering of resources
|
||||
* by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>,
|
||||
* shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY].
|
||||
* Calculate a resource's min share ratios. The {@code ratios} array will be
|
||||
* populated with the {@code resource} divided by {@code minShare} for each
|
||||
* resource type. If the min shares are 5 MB and 10 vcores, and the usage
|
||||
* ({@code resource}) is 10 MB and 5 CPU, the ratios will be 2 and 0.5.
|
||||
*
|
||||
* The {@code ratios} array must be <i>n</i> x 3, where <i>n</i> is the
|
||||
* number of resource types. Only the third index of the inner arrays in
|
||||
* the {@code ratios} array will be used, e.g. {@code ratios[x][2]}.
|
||||
*
|
||||
* @param resource the resource for which to calculate min shares
|
||||
* @param minShare the min share
|
||||
* @param ratios the shares array to populate
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void calculateShares(Resource resource, Resource pool,
|
||||
ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
|
||||
shares.setWeight(MEMORY, (float)resource.getMemorySize() /
|
||||
(pool.getMemorySize() * weights.getWeight(MEMORY)));
|
||||
shares.setWeight(CPU, (float)resource.getVirtualCores() /
|
||||
(pool.getVirtualCores() * weights.getWeight(CPU)));
|
||||
// sort order vector by resource share
|
||||
if (resourceOrder != null) {
|
||||
if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) {
|
||||
resourceOrder[0] = MEMORY;
|
||||
resourceOrder[1] = CPU;
|
||||
} else {
|
||||
resourceOrder[0] = CPU;
|
||||
resourceOrder[1] = MEMORY;
|
||||
}
|
||||
void calculateMinShareRatios(Resource resource, Resource minShare,
|
||||
float[][] ratios) {
|
||||
ResourceInformation[] resourceInfo = resource.getResources();
|
||||
ResourceInformation[] minShareInfo = minShare.getResources();
|
||||
|
||||
for (int i = 0; i < minShareInfo.length; i++) {
|
||||
ratios[i][2] =
|
||||
resourceInfo[i].getValue() / (float) minShareInfo[i].getValue();
|
||||
}
|
||||
}
|
||||
|
||||
private int compareShares(ResourceWeights shares1, ResourceWeights shares2,
|
||||
ResourceType[] resourceOrder1, ResourceType[] resourceOrder2) {
|
||||
for (int i = 0; i < resourceOrder1.length; i++) {
|
||||
int ret = (int)Math.signum(shares1.getWeight(resourceOrder1[i])
|
||||
- shares2.getWeight(resourceOrder2[i]));
|
||||
|
||||
/**
|
||||
* Compare the two ratios arrays and return -1, 0, or 1 if the first array
|
||||
* is less than, equal to, or greater than the second array, respectively.
|
||||
* The {@code index} parameter determines which index of the inner arrays
|
||||
* will be used for the comparisons. 0 is for usage ratios, 1 is for
|
||||
* fair share ratios, and 2 is for the min share ratios. The ratios arrays
|
||||
* are assumed to be sorted in descending order by usage ratio.
|
||||
*
|
||||
* @param ratios1 the first shares array
|
||||
* @param ratios2 the second shares array
|
||||
* @param index the outer index of the ratios arrays to compare. 0 is for
|
||||
* usage ratio, 1 is for approximate fair share ratios, and 1 is for min
|
||||
* share ratios
|
||||
* @return -1, 0, or 1 if the first array is less than, equal to, or
|
||||
* greater than the second array, respectively
|
||||
*/
|
||||
@VisibleForTesting
|
||||
int compareRatios(float[][] ratios1, float[][] ratios2, int index) {
|
||||
int ret = 0;
|
||||
|
||||
for (int i = 0; i < ratios1.length; i++) {
|
||||
ret = (int) Math.signum(ratios1[i][index] - ratios2[i][index]);
|
||||
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
@ -42,9 +42,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@Private
|
||||
@Unstable
|
||||
public class FairSharePolicy extends SchedulingPolicy {
|
||||
private static final Log LOG = LogFactory.getLog(FairSharePolicy.class);
|
||||
@VisibleForTesting
|
||||
public static final String NAME = "fair";
|
||||
private static final Log LOG = LogFactory.getLog(FairSharePolicy.class);
|
||||
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
|
||||
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
||||
new DefaultResourceCalculator();
|
||||
private static final FairShareComparator COMPARATOR =
|
||||
|
@ -164,10 +165,11 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
*/
|
||||
private int compareFairShareUsage(Schedulable s1, Schedulable s2,
|
||||
Resource resourceUsage1, Resource resourceUsage2) {
|
||||
double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
|
||||
double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
|
||||
double weight1 = s1.getWeight();
|
||||
double weight2 = s2.getWeight();
|
||||
double useToWeightRatio1;
|
||||
double useToWeightRatio2;
|
||||
|
||||
if (weight1 > 0.0 && weight2 > 0.0) {
|
||||
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
|
||||
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
|
||||
|
@ -213,14 +215,13 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
@Override
|
||||
public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
Resource totalResources) {
|
||||
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
|
||||
ComputeFairShares.computeShares(schedulables, totalResources, MEMORY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeSteadyShares(Collection<? extends FSQueue> queues,
|
||||
Resource totalResources) {
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources,
|
||||
ResourceType.MEMORY);
|
||||
ComputeFairShares.computeSteadyShares(queues, totalResources, MEMORY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
@ -137,7 +136,7 @@ public class TestFairSchedulerPlanFollower extends
|
|||
}
|
||||
@Override
|
||||
protected void verifyCapacity(Queue defQ) {
|
||||
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9);
|
||||
assertTrue(((FSQueue) defQ).getWeight() > 0.9);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,8 +172,7 @@ public class TestFairSchedulerPlanFollower extends
|
|||
false);
|
||||
assertNotNull(q);
|
||||
// For now we are setting both to same weight
|
||||
Assert.assertEquals(expectedCapacity,
|
||||
q.getWeights().getWeight(ResourceType.MEMORY), 0.01);
|
||||
Assert.assertEquals(expectedCapacity, q.getWeight(), 0.01);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestResourceWeights {
|
||||
|
||||
@Test(timeout=3000)
|
||||
public void testWeights() {
|
||||
ResourceWeights rw1 = new ResourceWeights();
|
||||
Assert.assertEquals("Default CPU weight should be 0.0f.", 0.0f,
|
||||
rw1.getWeight(ResourceType.CPU), 0.00001f);
|
||||
Assert.assertEquals("Default memory weight should be 0.0f", 0.0f,
|
||||
rw1.getWeight(ResourceType.MEMORY), 0.00001f);
|
||||
|
||||
ResourceWeights rw2 = new ResourceWeights(2.0f);
|
||||
Assert.assertEquals("The CPU weight should be 2.0f.", 2.0f,
|
||||
rw2.getWeight(ResourceType.CPU), 0.00001f);
|
||||
Assert.assertEquals("The memory weight should be 2.0f", 2.0f,
|
||||
rw2.getWeight(ResourceType.MEMORY), 0.00001f);
|
||||
|
||||
// set each individually
|
||||
ResourceWeights rw3 = new ResourceWeights(1.5f, 2.0f);
|
||||
Assert.assertEquals("The CPU weight should be 2.0f", 2.0f,
|
||||
rw3.getWeight(ResourceType.CPU), 0.00001f);
|
||||
Assert.assertEquals("The memory weight should be 1.5f", 1.5f,
|
||||
rw3.getWeight(ResourceType.MEMORY), 0.00001f);
|
||||
|
||||
// reset weights
|
||||
rw3.setWeight(ResourceType.CPU, 2.5f);
|
||||
Assert.assertEquals("The CPU weight should be set to 2.5f.", 2.5f,
|
||||
rw3.getWeight(ResourceType.CPU), 0.00001f);
|
||||
rw3.setWeight(ResourceType.MEMORY, 4.0f);
|
||||
Assert.assertEquals("The memory weight should be set to 4.0f.", 4.0f,
|
||||
rw3.getWeight(ResourceType.MEMORY), 0.00001f);
|
||||
}
|
||||
}
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
|
@ -33,7 +31,7 @@ public class FakeSchedulable implements Schedulable {
|
|||
private Resource minShare;
|
||||
private Resource maxShare;
|
||||
private Resource fairShare;
|
||||
private ResourceWeights weights;
|
||||
private float weights;
|
||||
private Priority priority;
|
||||
private long startTime;
|
||||
|
||||
|
@ -49,28 +47,31 @@ public class FakeSchedulable implements Schedulable {
|
|||
this(minShare, maxShare, 1, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(int minShare, double memoryWeight) {
|
||||
public FakeSchedulable(int minShare, float memoryWeight) {
|
||||
this(minShare, Integer.MAX_VALUE, memoryWeight, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(int minShare, int maxShare, double memoryWeight) {
|
||||
public FakeSchedulable(int minShare, int maxShare, float memoryWeight) {
|
||||
this(minShare, maxShare, memoryWeight, 0, 0, 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(int minShare, int maxShare, double weight, int fairShare, int usage,
|
||||
long startTime) {
|
||||
this(Resources.createResource(minShare, 0), Resources.createResource(maxShare, 0),
|
||||
new ResourceWeights((float)weight), Resources.createResource(fairShare, 0),
|
||||
public FakeSchedulable(int minShare, int maxShare, float weight,
|
||||
int fairShare, int usage, long startTime) {
|
||||
this(Resources.createResource(minShare, 0),
|
||||
Resources.createResource(maxShare, 0),
|
||||
weight, Resources.createResource(fairShare, 0),
|
||||
Resources.createResource(usage, 0), startTime);
|
||||
}
|
||||
|
||||
public FakeSchedulable(Resource minShare, ResourceWeights weights) {
|
||||
this(minShare, Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE),
|
||||
weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0);
|
||||
public FakeSchedulable(Resource minShare, float weights) {
|
||||
this(minShare,
|
||||
Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE),
|
||||
weights, Resources.createResource(0, 0),
|
||||
Resources.createResource(0, 0), 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(Resource minShare, Resource maxShare,
|
||||
ResourceWeights weight, Resource fairShare, Resource usage, long startTime) {
|
||||
float weight, Resource fairShare, Resource usage, long startTime) {
|
||||
this.minShare = minShare;
|
||||
this.maxShare = maxShare;
|
||||
this.weights = weight;
|
||||
|
@ -121,7 +122,7 @@ public class FakeSchedulable implements Schedulable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResourceWeights getWeights() {
|
||||
public float getWeight() {
|
||||
return weights;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,12 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -52,7 +51,7 @@ public class TestComputeFairShares {
|
|||
scheds.add(new FakeSchedulable());
|
||||
scheds.add(new FakeSchedulable());
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40), ResourceType.MEMORY);
|
||||
Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(10, 10, 10, 10);
|
||||
}
|
||||
|
||||
|
@ -70,7 +69,7 @@ public class TestComputeFairShares {
|
|||
scheds.add(new FakeSchedulable(0, 11));
|
||||
scheds.add(new FakeSchedulable(0, 3));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40), ResourceType.MEMORY);
|
||||
Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(13, 13, 11, 3);
|
||||
}
|
||||
|
||||
|
@ -90,7 +89,7 @@ public class TestComputeFairShares {
|
|||
scheds.add(new FakeSchedulable(0));
|
||||
scheds.add(new FakeSchedulable(2));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40), ResourceType.MEMORY);
|
||||
Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(20, 18, 0, 2);
|
||||
}
|
||||
|
||||
|
@ -100,12 +99,12 @@ public class TestComputeFairShares {
|
|||
*/
|
||||
@Test
|
||||
public void testWeightedSharing() {
|
||||
scheds.add(new FakeSchedulable(0, 2.0));
|
||||
scheds.add(new FakeSchedulable(0, 1.0));
|
||||
scheds.add(new FakeSchedulable(0, 1.0));
|
||||
scheds.add(new FakeSchedulable(0, 0.5));
|
||||
scheds.add(new FakeSchedulable(0, 2.0f));
|
||||
scheds.add(new FakeSchedulable(0, 1.0f));
|
||||
scheds.add(new FakeSchedulable(0, 1.0f));
|
||||
scheds.add(new FakeSchedulable(0, 0.5f));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(45), ResourceType.MEMORY);
|
||||
Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(20, 10, 10, 5);
|
||||
}
|
||||
|
||||
|
@ -118,12 +117,12 @@ public class TestComputeFairShares {
|
|||
*/
|
||||
@Test
|
||||
public void testWeightedSharingWithMaxShares() {
|
||||
scheds.add(new FakeSchedulable(0, 10, 2.0));
|
||||
scheds.add(new FakeSchedulable(0, 11, 1.0));
|
||||
scheds.add(new FakeSchedulable(0, 30, 1.0));
|
||||
scheds.add(new FakeSchedulable(0, 20, 0.5));
|
||||
scheds.add(new FakeSchedulable(0, 10, 2.0f));
|
||||
scheds.add(new FakeSchedulable(0, 11, 1.0f));
|
||||
scheds.add(new FakeSchedulable(0, 30, 1.0f));
|
||||
scheds.add(new FakeSchedulable(0, 20, 0.5f));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(45), ResourceType.MEMORY);
|
||||
Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(10, 11, 16, 8);
|
||||
}
|
||||
|
||||
|
@ -137,12 +136,12 @@ public class TestComputeFairShares {
|
|||
*/
|
||||
@Test
|
||||
public void testWeightedSharingWithMinShares() {
|
||||
scheds.add(new FakeSchedulable(20, 2.0));
|
||||
scheds.add(new FakeSchedulable(0, 1.0));
|
||||
scheds.add(new FakeSchedulable(5, 1.0));
|
||||
scheds.add(new FakeSchedulable(15, 0.5));
|
||||
scheds.add(new FakeSchedulable(20, 2.0f));
|
||||
scheds.add(new FakeSchedulable(0, 1.0f));
|
||||
scheds.add(new FakeSchedulable(5, 1.0f));
|
||||
scheds.add(new FakeSchedulable(15, 0.5f));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(45), ResourceType.MEMORY);
|
||||
Resources.createResource(45), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(20, 5, 5, 15);
|
||||
}
|
||||
|
||||
|
@ -158,7 +157,8 @@ public class TestComputeFairShares {
|
|||
scheds.add(new FakeSchedulable());
|
||||
scheds.add(new FakeSchedulable());
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40 * million), ResourceType.MEMORY);
|
||||
Resources.createResource(40 * million),
|
||||
ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
|
||||
}
|
||||
|
||||
|
@ -168,7 +168,7 @@ public class TestComputeFairShares {
|
|||
@Test
|
||||
public void testEmptyList() {
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40), ResourceType.MEMORY);
|
||||
Resources.createResource(40), ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares();
|
||||
}
|
||||
|
||||
|
@ -177,16 +177,12 @@ public class TestComputeFairShares {
|
|||
*/
|
||||
@Test
|
||||
public void testCPU() {
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 20),
|
||||
new ResourceWeights(2.0f)));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 0),
|
||||
new ResourceWeights(1.0f)));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 5),
|
||||
new ResourceWeights(1.0f)));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 15),
|
||||
new ResourceWeights(0.5f)));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 20), 2.0f));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 0), 1.0f));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 5), 1.0f));
|
||||
scheds.add(new FakeSchedulable(Resources.createResource(0, 15), 0.5f));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(0, 45), ResourceType.CPU);
|
||||
Resources.createResource(0, 45), ResourceInformation.VCORES.getName());
|
||||
verifyCPUShares(20, 5, 5, 15);
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
|
@ -1984,7 +1983,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
// assert that the steady fair share is 1/4th node1's capacity
|
||||
assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemorySize());
|
||||
// assert weights are equal for both the user queues
|
||||
assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
|
||||
assertEquals(1.0, leaf.getWeight(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5275,7 +5274,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
child1.updateDemand();
|
||||
|
||||
String childQueueString = "{Name: root.parent.child1,"
|
||||
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||
+ " Weight: 1.0,"
|
||||
+ " Policy: fair,"
|
||||
+ " FairShare: <memory:0, vCores:0>,"
|
||||
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||
|
@ -5292,14 +5291,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
+ " LastTimeAtMinShare: " + clock.getTime()
|
||||
+ "}";
|
||||
|
||||
assertTrue(child1.dumpState().equals(childQueueString));
|
||||
assertEquals("Unexpected state dump string",
|
||||
childQueueString, child1.dumpState());
|
||||
FSParentQueue parent =
|
||||
scheduler.getQueueManager().getParentQueue("parent", false);
|
||||
parent.setMaxShare(resource);
|
||||
parent.updateDemand();
|
||||
|
||||
String parentQueueString = "{Name: root.parent,"
|
||||
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||
+ " Weight: 1.0,"
|
||||
+ " Policy: fair,"
|
||||
+ " FairShare: <memory:0, vCores:0>,"
|
||||
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||
|
@ -5310,7 +5310,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
+ " MaxAMShare: 0.5,"
|
||||
+ " Runnable: 0}";
|
||||
|
||||
assertTrue(parent.dumpState().equals(
|
||||
parentQueueString + ", " + childQueueString));
|
||||
assertEquals("Unexpected state dump string",
|
||||
parentQueueString + ", " + childQueueString, parent.dumpState());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
|
@ -134,11 +133,7 @@ public class TestSchedulingPolicy {
|
|||
Resource.newInstance(0, 1), Resource.newInstance(2, 1),
|
||||
Resource.newInstance(4, 1) };
|
||||
|
||||
private ResourceWeights[] weightsCollection = {
|
||||
new ResourceWeights(0.0f), new ResourceWeights(1.0f),
|
||||
new ResourceWeights(2.0f) };
|
||||
|
||||
|
||||
private float[] weightsCollection = {0.0f, 1.0f, 2.0f};
|
||||
|
||||
public FairShareComparatorTester(
|
||||
Comparator<Schedulable> fairShareComparator) {
|
||||
|
@ -225,10 +220,10 @@ public class TestSchedulingPolicy {
|
|||
private String name;
|
||||
private long startTime;
|
||||
private Resource usage;
|
||||
private ResourceWeights weights;
|
||||
private float weights;
|
||||
|
||||
public MockSchedulable(Resource minShare, Resource demand, String name,
|
||||
long startTime, Resource usage, ResourceWeights weights) {
|
||||
long startTime, Resource usage, float weights) {
|
||||
this.minShare = minShare;
|
||||
this.demand = demand;
|
||||
this.name = name;
|
||||
|
@ -258,7 +253,7 @@ public class TestSchedulingPolicy {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResourceWeights getWeights() {
|
||||
public float getWeight() {
|
||||
return weights;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,15 +23,22 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import org.apache.curator.shaded.com.google.common.base.Joiner;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparator;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -39,10 +46,15 @@ import org.junit.Test;
|
|||
* container before sched2
|
||||
*/
|
||||
public class TestDominantResourceFairnessPolicy {
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
addResources("test");
|
||||
}
|
||||
|
||||
private Comparator<Schedulable> createComparator(int clusterMem,
|
||||
int clusterCpu) {
|
||||
DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy();
|
||||
DominantResourceFairnessPolicy policy =
|
||||
new DominantResourceFairnessPolicy();
|
||||
FSContext fsContext = mock(FSContext.class);
|
||||
when(fsContext.getClusterResource()).
|
||||
thenReturn(Resources.createResource(clusterMem, clusterCpu));
|
||||
|
@ -51,23 +63,23 @@ public class TestDominantResourceFairnessPolicy {
|
|||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage) {
|
||||
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL, 0, 0);
|
||||
return createSchedulable(memUsage, cpuUsage, 1.0f, 0, 0);
|
||||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
int minMemShare, int minCpuShare) {
|
||||
return createSchedulable(memUsage, cpuUsage, ResourceWeights.NEUTRAL,
|
||||
return createSchedulable(memUsage, cpuUsage, 1.0f,
|
||||
minMemShare, minCpuShare);
|
||||
}
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
ResourceWeights weights) {
|
||||
float weights) {
|
||||
return createSchedulable(memUsage, cpuUsage, weights, 0, 0);
|
||||
}
|
||||
|
||||
|
||||
private Schedulable createSchedulable(int memUsage, int cpuUsage,
|
||||
ResourceWeights weights, int minMemShare, int minCpuShare) {
|
||||
float weights, int minMemShare, int minCpuShare) {
|
||||
Resource usage = BuilderUtils.newResource(memUsage, cpuUsage);
|
||||
Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare);
|
||||
return new FakeSchedulable(minShare,
|
||||
|
@ -77,94 +89,260 @@ public class TestDominantResourceFairnessPolicy {
|
|||
|
||||
@Test
|
||||
public void testSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 4).compare(
|
||||
createSchedulable(1000, 1),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
Comparator c = createComparator(8000, 4);
|
||||
Schedulable s1 = createSchedulable(1000, 1);
|
||||
Schedulable s2 = createSchedulable(2000, 1);
|
||||
|
||||
assertTrue("Comparison didn't return a value less than 0",
|
||||
c.compare(s1, s2) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(4000, 3),
|
||||
createSchedulable(2000, 5)) < 0);
|
||||
Comparator c = createComparator(8000, 8);
|
||||
Schedulable s1 = createSchedulable(4000, 3);
|
||||
Schedulable s2 = createSchedulable(2000, 5);
|
||||
|
||||
assertTrue("Comparison didn't return a value less than 0",
|
||||
c.compare(s1, s2) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOneIsNeedy() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(2000, 5, 0, 6),
|
||||
createSchedulable(4000, 3, 0, 0)) < 0);
|
||||
Comparator c = createComparator(8000, 8);
|
||||
Schedulable s1 = createSchedulable(2000, 5, 0, 6);
|
||||
Schedulable s2 = createSchedulable(4000, 3, 0, 0);
|
||||
|
||||
assertTrue("Comparison didn't return a value less than 0",
|
||||
c.compare(s1, s2) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBothAreNeedy() {
|
||||
assertTrue(createComparator(8000, 100).compare(
|
||||
// dominant share is 2000/8000
|
||||
createSchedulable(2000, 5),
|
||||
// dominant share is 4000/8000
|
||||
createSchedulable(4000, 3)) < 0);
|
||||
assertTrue(createComparator(8000, 100).compare(
|
||||
// dominant min share is 2/3
|
||||
createSchedulable(2000, 5, 3000, 6),
|
||||
// dominant min share is 4/5
|
||||
createSchedulable(4000, 3, 5000, 4)) < 0);
|
||||
Comparator c = createComparator(8000, 100);
|
||||
// dominant share is 2000/8000
|
||||
Schedulable s1 = createSchedulable(2000, 5);
|
||||
// dominant share is 4000/8000
|
||||
Schedulable s2 = createSchedulable(4000, 3);
|
||||
|
||||
assertTrue("Comparison didn't return a value less than 0",
|
||||
c.compare(s1, s2) < 0);
|
||||
|
||||
// dominant min share is 2/3
|
||||
s1 = createSchedulable(2000, 5, 3000, 6);
|
||||
// dominant min share is 4/5
|
||||
s2 = createSchedulable(4000, 3, 5000, 4);
|
||||
|
||||
assertTrue("Comparison didn't return a value less than 0",
|
||||
c.compare(s1, s2) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvenWeightsSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
|
||||
createSchedulable(3000, 1, 2.0f),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
|
||||
createSchedulable(1000, 3, 2.0f),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvenWeightsDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(2.0f)),
|
||||
createSchedulable(1000, 3, 2.0f),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f)),
|
||||
createSchedulable(3000, 1, 2.0f),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnevenWeightsSameDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
public void testSortShares() {
|
||||
float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}};
|
||||
float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}};
|
||||
float[][] expected1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}};
|
||||
float[][] expected2 = {{0.3f, 2.0f}, {0.25f, 0.1f}, {0.2f, 9.0f}};
|
||||
DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessComparator();
|
||||
|
||||
comparator.sortRatios(ratios1, ratios2);
|
||||
|
||||
for (int i = 0; i < ratios1.length; i++) {
|
||||
Assert.assertArrayEquals("The shares array was not sorted into the "
|
||||
+ "expected order: incorrect inner array encountered",
|
||||
expected1[i], ratios1[i], 0.00001f);
|
||||
Assert.assertArrayEquals("The shares array was not sorted into the "
|
||||
+ "expected order: incorrect inner array encountered",
|
||||
expected2[i], ratios2[i], 0.00001f);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testUnevenWeightsDifferentDominantResource() {
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(1000, 3, new ResourceWeights(1.0f, 2.0f)),
|
||||
createSchedulable(2000, 1)) < 0);
|
||||
assertTrue(createComparator(8000, 8).compare(
|
||||
createSchedulable(3000, 1, new ResourceWeights(2.0f, 1.0f)),
|
||||
createSchedulable(1000, 2)) < 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateShares() {
|
||||
public void testCalculateClusterAndFairRatios() {
|
||||
Map<String, Integer> index = ResourceUtils.getResourceTypeIndex();
|
||||
Resource used = Resources.createResource(10, 5);
|
||||
Resource capacity = Resources.createResource(100, 10);
|
||||
ResourceType[] resourceOrder = new ResourceType[2];
|
||||
ResourceWeights shares = new ResourceWeights();
|
||||
DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessPolicy.DominantResourceFairnessComparator();
|
||||
comparator.calculateShares(used, capacity, shares, resourceOrder,
|
||||
ResourceWeights.NEUTRAL);
|
||||
|
||||
assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001);
|
||||
assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001);
|
||||
assertEquals(ResourceType.CPU, resourceOrder[0]);
|
||||
assertEquals(ResourceType.MEMORY, resourceOrder[1]);
|
||||
float[][] shares = new float[3][2];
|
||||
DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessComparator();
|
||||
|
||||
used.setResourceValue("test", 2L);
|
||||
capacity.setResourceValue("test", 5L);
|
||||
|
||||
int dominant = comparator.calculateClusterAndFairRatios(used, capacity,
|
||||
shares, 1.0f);
|
||||
|
||||
assertEquals("Calculated usage ratio for memory (10MB out of 100MB) is "
|
||||
+ "incorrect", 0.1,
|
||||
shares[index.get(ResourceInformation.MEMORY_MB.getName())][0], .00001);
|
||||
assertEquals("Calculated usage ratio for vcores (5 out of 10) is "
|
||||
+ "incorrect", 0.5,
|
||||
shares[index.get(ResourceInformation.VCORES.getName())][0], .00001);
|
||||
assertEquals("Calculated usage ratio for test resource (2 out of 5) is "
|
||||
+ "incorrect", 0.4, shares[index.get("test")][0], .00001);
|
||||
assertEquals("The wrong dominant resource index was returned",
|
||||
index.get(ResourceInformation.VCORES.getName()).intValue(),
|
||||
dominant);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCalculateMinShareRatios() {
|
||||
Map<String, Integer> index = ResourceUtils.getResourceTypeIndex();
|
||||
Resource used = Resources.createResource(10, 5);
|
||||
Resource minShares = Resources.createResource(5, 10);
|
||||
float[][] ratios = new float[3][3];
|
||||
DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessComparator();
|
||||
|
||||
used.setResourceValue("test", 2L);
|
||||
minShares.setResourceValue("test", 0L);
|
||||
|
||||
comparator.calculateMinShareRatios(used, minShares, ratios);
|
||||
|
||||
assertEquals("Calculated min share ratio for memory (10MB out of 5MB) is "
|
||||
+ "incorrect", 2.0,
|
||||
ratios[index.get(ResourceInformation.MEMORY_MB.getName())][2], .00001f);
|
||||
assertEquals("Calculated min share ratio for vcores (5 out of 10) is "
|
||||
+ "incorrect", 0.5,
|
||||
ratios[index.get(ResourceInformation.VCORES.getName())][2], .00001f);
|
||||
assertEquals("Calculated min share ratio for test resource (0 out of 5) is "
|
||||
+ "incorrect", Float.POSITIVE_INFINITY, ratios[index.get("test")][2],
|
||||
0.00001f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompareShares() {
|
||||
float[][] ratios1 = {
|
||||
{0.4f, 0.1f, 2.0f},
|
||||
{0.3f, 2.0f, 0.1f},
|
||||
{0.2f, 1.0f, 9.0f}
|
||||
};
|
||||
float[][] ratios2 = {
|
||||
{0.3f, 2.0f, 1.0f},
|
||||
{0.2f, 0.1f, 0.5f},
|
||||
{0.2f, 1.0f, 2.0f}
|
||||
};
|
||||
float[][] ratios3 = {
|
||||
{0.3f, 2.0f, 1.0f},
|
||||
{0.2f, 0.1f, 2.0f},
|
||||
{0.1f, 2.0f, 1.0f}
|
||||
};
|
||||
DominantResourceFairnessComparator comparator =
|
||||
new DominantResourceFairnessComparator();
|
||||
|
||||
int ret = comparator.compareRatios(ratios1, ratios2, 0);
|
||||
|
||||
assertEquals("Expected the first array to be larger because the first "
|
||||
+ "usage ratio element is larger", 1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios1, 0);
|
||||
|
||||
assertEquals("Expected the first array to be smaller because the first "
|
||||
+ "usage ratio element is smaller", -1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios1, ratios1, 0);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios2, 0);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios3, ratios3, 0);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios3, 0);
|
||||
|
||||
assertEquals("Expected the first array to be larger because the last "
|
||||
+ "usage ratio element is larger, and all other elements are equal",
|
||||
1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios1, ratios2, 1);
|
||||
|
||||
assertEquals("Expected the first array to be smaller because the first "
|
||||
+ "fair share ratio element is smaller", -1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios1, 1);
|
||||
|
||||
assertEquals("Expected the first array to be larger because the first "
|
||||
+ "fair share ratio element is larger", 1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios1, ratios1, 1);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios2, 1);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios3, ratios3, 1);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios3, 1);
|
||||
|
||||
assertEquals("Expected the first array to be smaller because the last "
|
||||
+ "usage ratio element is smaller, and all other elements are equal",
|
||||
-1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios1, ratios2, 2);
|
||||
|
||||
assertEquals("Expected the first array to be larger because the first "
|
||||
+ "min share ratio element is larger", 1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios1, 2);
|
||||
|
||||
assertEquals("Expected the first array to be smaller because the first "
|
||||
+ "min share ratio element is smaller", -1, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios1, ratios1, 2);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios2, 2);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios3, ratios3, 2);
|
||||
|
||||
assertEquals("Expected the arrays to be equal, since they're the same "
|
||||
+ "array", 0, ret);
|
||||
|
||||
ret = comparator.compareRatios(ratios2, ratios3, 2);
|
||||
|
||||
assertEquals("Expected the first array to be smaller because the second "
|
||||
+ "min share ratio element is smaller, and all the first elements are "
|
||||
+ "equal", -1, ret);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -183,4 +361,12 @@ public class TestDominantResourceFairnessPolicy {
|
|||
assertTrue(createComparator(8000, 6)
|
||||
.compare(schedulable1, schedulable2) < 0);
|
||||
}
|
||||
|
||||
private static void addResources(String... resources) {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// Add a third resource to the allowed set
|
||||
conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources));
|
||||
ResourceUtils.resetResourceTypes(conf);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue