YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1496153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-24 18:33:45 +00:00
parent ca35235b04
commit e60fbbcc2e
15 changed files with 401 additions and 226 deletions

View File

@ -39,6 +39,8 @@ Release 2.2.0 - UNRELEASED
YARN-866. Add test for class ResourceWeights. (ywskycn via tucu)
YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu)
OPTIMIZATIONS
BUG FIXES

View File

@ -60,6 +60,39 @@ public int compareTo(Resource o) {
}
};
private static final Resource UNBOUNDED = new Resource() {
@Override
public int getMemory() {
return Integer.MAX_VALUE;
}
@Override
public void setMemory(int memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
public int getVirtualCores() {
return Integer.MAX_VALUE;
}
@Override
public void setVirtualCores(int cores) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
public int compareTo(Resource o) {
int diff = 0 - o.getMemory();
if (diff == 0) {
diff = 0 - o.getVirtualCores();
}
return diff;
}
};
public static Resource createResource(int memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
@ -75,6 +108,10 @@ public static Resource createResource(int memory, int cores) {
public static Resource none() {
return NONE;
}
public static Resource unbounded() {
return UNBOUNDED;
}
public static Resource clone(Resource res) {
return createResource(res.getMemory(), res.getVirtualCores());

View File

@ -109,7 +109,12 @@ public Resource getResourceUsage() {
@Override
public Resource getMinShare() {
return Resources.createResource(0);
return Resources.none();
}
@Override
public Resource getMaxShare() {
return Resources.unbounded();
}
/**

View File

@ -93,6 +93,11 @@ public ResourceWeights getWeights() {
public Resource getMinShare() {
return queueMgr.getMinResources(getName());
}
@Override
public Resource getMaxShare() {
return queueMgr.getMaxResources(getName());
}
@Override
public long getStartTime() {

View File

@ -311,7 +311,7 @@ boolean isStarvedForMinShare(FSLeafQueue sched) {
* defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity,
Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity,
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), desiredFairShare);

View File

@ -79,6 +79,8 @@ public abstract class Schedulable {
/** Minimum Resource share assigned to the schedulable. */
public abstract Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
public abstract Resource getMaxShare();
/** Job/queue weight in fair sharing. */
public abstract ResourceWeights getWeights();

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
/**
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
* share is {@link Resource} it is entitled to, independent of the current
* demands and allocations on the cluster. A {@link Schedulable} whose resource
* consumption lies at or below its fair share will never have its containers
* preempted.
*/
public class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares to
* the Schedulables that respects their min and max shares.
*
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of min and max shares. If there
* were no minimum or maximum shares, then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum and maximum shares add a
* further twist - Some Schedulables may have a min share higher than their
* assigned share or a max share lower than their assigned share.
*
* To deal with these possibilities, we define an assignment of slots as being
* fair if there exists a ratio R such that: Schedulables S where S.minShare
* > R * S.weight are given share S.minShare - Schedulables S where S.maxShare
* < R * S.weight are given S.maxShare - All other Schedulables S are
* assigned share R * S.weight - The sum of all the shares is totalSlots.
*
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
*
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number of
* slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
*
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* use more than totalResources resources). The helper method
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
* given value of R.
*
* The running time of this algorithm is linear in the number of Schedulables,
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) {
if (schedulables.isEmpty()) {
return;
}
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
if (maxShare == Integer.MAX_VALUE) {
totalMaxShare = Integer.MAX_VALUE;
break;
} else {
totalMaxShare += maxShare;
}
}
int totalResource = Math.min(totalMaxShare,
getResourceValue(totalResources, type));
double rMax = 1.0;
while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type)
< totalResource) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
if (resourceUsedWithWeightToResourceRatio(mid, schedulables, type) <
totalResource) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type);
}
}
/**
* Compute the resources that would be used given a weight-to-resource ratio
* w2rRatio, for use in the computeFairShares algorithm as described in #
*/
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection<? extends Schedulable> schedulables, ResourceType type) {
int resourcesTaken = 0;
for (Schedulable sched : schedulables) {
int share = computeShare(sched, w2rRatio, type);
resourcesTaken += share;
}
return resourcesTaken;
}
/**
* Compute the resources assigned to a Schedulable given a particular
* weight-to-resource ratio w2rRatio.
*/
private static int computeShare(Schedulable sched, double w2rRatio,
ResourceType type) {
double share = sched.getWeights().getWeight(type) * w2rRatio;
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
return (int) share;
}
private static int getResourceValue(Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
return resource.getMemory();
case CPU:
return resource.getVirtualCores();
default:
throw new IllegalArgumentException("Invalid resource");
}
}
private static void setResourceValue(int val, Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
resource.setMemory(val);
break;
case CPU:
resource.setVirtualCores(val);
break;
default:
throw new IllegalArgumentException("Invalid resource");
}
}
}

View File

@ -64,13 +64,8 @@ public Comparator<Schedulable> getComparator() {
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
// TODO: For now, set all fair shares to 0, because, in the context of DRF,
// it doesn't make sense to set a value for each resource. YARN-736 should
// add in a sensible replacement.
for (Schedulable schedulable : schedulables) {
schedulable.setFairShare(Resources.none());
for (ResourceType type : ResourceType.values()) {
ComputeFairShares.computeShares(schedulables, totalResources, type);
}
}

View File

@ -116,120 +116,7 @@ public Comparator<Schedulable> getComparator() {
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
computeFairShares(schedulables, totalResources);
}
/**
* Number of iterations for the binary search in computeFairShares. This is
* equivalent to the number of bits of precision in the output. 25 iterations
* gives precision better than 0.1 slots in clusters with one million slots.
*/
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min shares and demands of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares to
* the Schedulables that respects their min shares and demands.
*
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of minimum shares and demands. If there
* were no minimum shares and every Schedulable had an infinite demand (i.e.
* could launch infinitely many tasks), then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum shares and demands add two
* further twists: - Some Schedulables may not have enough tasks to fill all
* their share. - Some Schedulables may have a min share higher than their
* assigned share.
*
* To deal with these possibilities, we define an assignment of slots as being
* fair if there exists a ratio R such that: - Schedulables S where S.demand <
* R * S.weight are assigned share S.demand - Schedulables S where S.minShare
* > R * S.weight are given share S.minShare - All other Schedulables S are
* assigned share R * S.weight - The sum of all the shares is totalSlots.
*
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
*
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number of
* slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
*
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* either use more than totalSlots slots or we fulfill all jobs' demands). The
* helper method slotsUsedWithWeightToSlotRatio computes the total number of
* slots used with a given value of R.
*
* The running time of this algorithm is linear in the number of Schedulables,
* because slotsUsedWithWeightToSlotRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeFairShares(
Collection<? extends Schedulable> schedulables, Resource totalResources) {
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used totalSlots slots or we
// have met all Schedulables' demands (if total demand < totalSlots).
Resource totalDemand = Resources.createResource(0);
for (Schedulable sched : schedulables) {
Resources.addTo(totalDemand, sched.getDemand());
}
Resource cap = Resources.min(RESOURCE_CALCULATOR, null, totalDemand,
totalResources);
double rMax = 1.0;
while (Resources.lessThan(RESOURCE_CALCULATOR, null,
resUsedWithWeightToResRatio(rMax, schedulables),
cap)) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
if (Resources.lessThan(RESOURCE_CALCULATOR, null,
resUsedWithWeightToResRatio(mid, schedulables),
cap)) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched : schedulables) {
sched.setFairShare(computeShare(sched, right));
}
}
/**
* Compute the number of slots that would be used given a weight-to-slot ratio
* w2sRatio, for use in the computeFairShares algorithm as described in #
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource resUsedWithWeightToResRatio(double w2sRatio,
Collection<? extends Schedulable> schedulables) {
Resource slotsTaken = Resources.createResource(0);
for (Schedulable sched : schedulables) {
Resource share = computeShare(sched, w2sRatio);
Resources.addTo(slotsTaken, share);
}
return slotsTaken;
}
/**
* Compute the resources assigned to a Schedulable given a particular
* res-to-slot ratio r2sRatio, for use in computeFairShares as described in #
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
}
@Override

View File

@ -73,9 +73,14 @@ public Comparator<Schedulable> getComparator() {
@Override
public void computeShares(Collection<? extends Schedulable> schedulables,
Resource totalResources) {
for (Schedulable sched : schedulables) {
sched.setFairShare(Resources.createResource(0));
Schedulable earliest = null;
for (Schedulable schedulable : schedulables) {
if (earliest == null ||
schedulable.getStartTime() < earliest.getStartTime()) {
earliest = schedulable;
}
}
earliest.setFairShare(Resources.clone(totalResources));
}
@Override

View File

@ -28,40 +28,49 @@
* Dummy implementation of Schedulable for unit testing.
*/
public class FakeSchedulable extends Schedulable {
private Resource demand;
private Resource usage;
private Resource minShare;
private Resource maxShare;
private ResourceWeights weights;
private Priority priority;
private long startTime;
public FakeSchedulable() {
this(0, 0, 1, 0, 0, 0);
this(0, Integer.MAX_VALUE, 1, 0, 0, 0);
}
public FakeSchedulable(int demand) {
this(demand, 0, 1, 0, 0, 0);
public FakeSchedulable(int minShare) {
this(minShare, Integer.MAX_VALUE, 1, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare) {
this(demand, minShare, 1, 0, 0, 0);
public FakeSchedulable(int minShare, int maxShare) {
this(minShare, maxShare, 1, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double memoryWeight) {
this(demand, minShare, memoryWeight, 0, 0, 0);
public FakeSchedulable(int minShare, double memoryWeight) {
this(minShare, Integer.MAX_VALUE, memoryWeight, 0, 0, 0);
}
public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage,
public FakeSchedulable(int minShare, int maxShare, double memoryWeight) {
this(minShare, maxShare, memoryWeight, 0, 0, 0);
}
public FakeSchedulable(int minShare, int maxShare, double weight, int fairShare, int usage,
long startTime) {
this(Resources.createResource(demand), Resources.createResource(minShare),
new ResourceWeights((float)weight), Resources.createResource(fairShare),
Resources.createResource(usage), startTime);
this(Resources.createResource(minShare, 0), Resources.createResource(maxShare, 0),
new ResourceWeights((float)weight), Resources.createResource(fairShare, 0),
Resources.createResource(usage, 0), startTime);
}
public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight,
Resource fairShare, Resource usage, long startTime) {
this.demand = demand;
public FakeSchedulable(Resource minShare, ResourceWeights weights) {
this(minShare, Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE),
weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0);
}
public FakeSchedulable(Resource minShare, Resource maxShare,
ResourceWeights weight, Resource fairShare, Resource usage, long startTime) {
this.minShare = minShare;
this.maxShare = maxShare;
this.weights = weight;
setFairShare(fairShare);
this.usage = usage;
@ -76,7 +85,7 @@ public Resource assignContainer(FSSchedulerNode node) {
@Override
public Resource getDemand() {
return demand;
return null;
}
@Override
@ -108,6 +117,11 @@ public ResourceWeights getWeights() {
public Resource getMinShare() {
return minShare;
}
@Override
public Resource getMaxShare() {
return maxShare;
}
@Override
public void updateDemand() {}

View File

@ -23,8 +23,10 @@
import junit.framework.Assert;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares;
import org.junit.Before;
import org.junit.Test;
@ -33,12 +35,10 @@
*/
public class TestComputeFairShares {
private List<Schedulable> scheds;
private SchedulingPolicy schedulingMode;
@Before
public void setUp() throws Exception {
scheds = new ArrayList<Schedulable>();
schedulingMode = new FairSharePolicy();
}
/**
@ -47,13 +47,13 @@ public void setUp() throws Exception {
*/
@Test
public void testEqualSharing() {
scheds.add(new FakeSchedulable(100));
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(30));
scheds.add(new FakeSchedulable(20));
schedulingMode.computeShares(scheds,
Resources.createResource(40));
verifyShares(10, 10, 10, 10);
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
ComputeFairShares.computeShares(scheds,
Resources.createResource(40), ResourceType.MEMORY);
verifyMemoryShares(10, 10, 10, 10);
}
/**
@ -64,16 +64,17 @@ public void testEqualSharing() {
* so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each.
*/
@Test
public void testLowDemands() {
scheds.add(new FakeSchedulable(100));
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(11));
scheds.add(new FakeSchedulable(3));
schedulingMode.computeShares(scheds,
Resources.createResource(40));
verifyShares(13, 13, 11, 3);
public void testLowMaxShares() {
scheds.add(new FakeSchedulable(0, 100));
scheds.add(new FakeSchedulable(0, 50));
scheds.add(new FakeSchedulable(0, 11));
scheds.add(new FakeSchedulable(0, 3));
ComputeFairShares.computeShares(scheds,
Resources.createResource(40), ResourceType.MEMORY);
verifyMemoryShares(13, 13, 11, 3);
}
/**
* In this test, some pools have minimum shares set. Pool 1 has a min share
* of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its
@ -84,13 +85,13 @@ public void testLowDemands() {
*/
@Test
public void testMinShares() {
scheds.add(new FakeSchedulable(100, 20));
scheds.add(new FakeSchedulable(10, 20));
scheds.add(new FakeSchedulable(10, 0));
scheds.add(new FakeSchedulable(3, 2));
schedulingMode.computeShares(scheds,
Resources.createResource(40));
verifyShares(20, 10, 7, 3);
scheds.add(new FakeSchedulable(20));
scheds.add(new FakeSchedulable(18));
scheds.add(new FakeSchedulable(0));
scheds.add(new FakeSchedulable(2));
ComputeFairShares.computeShares(scheds,
Resources.createResource(40), ResourceType.MEMORY);
verifyMemoryShares(20, 18, 0, 2);
}
/**
@ -99,15 +100,15 @@ public void testMinShares() {
*/
@Test
public void testWeightedSharing() {
scheds.add(new FakeSchedulable(100, 0, 2.0));
scheds.add(new FakeSchedulable(50, 0, 1.0));
scheds.add(new FakeSchedulable(30, 0, 1.0));
scheds.add(new FakeSchedulable(20, 0, 0.5));
schedulingMode.computeShares(scheds,
Resources.createResource(45));
verifyShares(20, 10, 10, 5);
scheds.add(new FakeSchedulable(0, 2.0));
scheds.add(new FakeSchedulable(0, 1.0));
scheds.add(new FakeSchedulable(0, 1.0));
scheds.add(new FakeSchedulable(0, 0.5));
ComputeFairShares.computeShares(scheds,
Resources.createResource(45), ResourceType.MEMORY);
verifyMemoryShares(20, 10, 10, 5);
}
/**
* Weighted sharing test where pools 1 and 2 are now given lower demands than
* above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split
@ -116,16 +117,17 @@ public void testWeightedSharing() {
* the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively.
*/
@Test
public void testWeightedSharingWithLowDemands() {
scheds.add(new FakeSchedulable(10, 0, 2.0));
scheds.add(new FakeSchedulable(11, 0, 1.0));
scheds.add(new FakeSchedulable(30, 0, 1.0));
scheds.add(new FakeSchedulable(20, 0, 0.5));
schedulingMode.computeShares(scheds,
Resources.createResource(45));
verifyShares(10, 11, 16, 8);
public void testWeightedSharingWithMaxShares() {
scheds.add(new FakeSchedulable(0, 10, 2.0));
scheds.add(new FakeSchedulable(0, 11, 1.0));
scheds.add(new FakeSchedulable(0, 30, 1.0));
scheds.add(new FakeSchedulable(0, 20, 0.5));
ComputeFairShares.computeShares(scheds,
Resources.createResource(45), ResourceType.MEMORY);
verifyMemoryShares(10, 11, 16, 8);
}
/**
* Weighted fair sharing test with min shares. As in the min share test above,
* pool 1 has a min share greater than its demand so it only gets its demand.
@ -135,13 +137,13 @@ public void testWeightedSharingWithLowDemands() {
*/
@Test
public void testWeightedSharingWithMinShares() {
scheds.add(new FakeSchedulable(10, 20, 2.0));
scheds.add(new FakeSchedulable(11, 0, 1.0));
scheds.add(new FakeSchedulable(30, 5, 1.0));
scheds.add(new FakeSchedulable(20, 15, 0.5));
schedulingMode.computeShares(scheds,
Resources.createResource(45));
verifyShares(10, 10, 10, 15);
scheds.add(new FakeSchedulable(20, 2.0));
scheds.add(new FakeSchedulable(0, 1.0));
scheds.add(new FakeSchedulable(5, 1.0));
scheds.add(new FakeSchedulable(15, 0.5));
ComputeFairShares.computeShares(scheds,
Resources.createResource(45), ResourceType.MEMORY);
verifyMemoryShares(20, 5, 5, 15);
}
/**
@ -151,28 +153,13 @@ public void testWeightedSharingWithMinShares() {
@Test
public void testLargeShares() {
int million = 1000 * 1000;
scheds.add(new FakeSchedulable(100 * million));
scheds.add(new FakeSchedulable(50 * million));
scheds.add(new FakeSchedulable(30 * million));
scheds.add(new FakeSchedulable(20 * million));
schedulingMode
.computeShares(scheds,
Resources.createResource(40 * million));
verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
}
/**
* Test that having a pool with 0 demand doesn't confuse the algorithm.
*/
@Test
public void testZeroDemand() {
scheds.add(new FakeSchedulable(100));
scheds.add(new FakeSchedulable(50));
scheds.add(new FakeSchedulable(30));
scheds.add(new FakeSchedulable(0));
schedulingMode.computeShares(scheds,
Resources.createResource(30));
verifyShares(10, 10, 10, 0);
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
scheds.add(new FakeSchedulable());
ComputeFairShares.computeShares(scheds,
Resources.createResource(40 * million), ResourceType.MEMORY);
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
}
/**
@ -180,18 +167,46 @@ public void testZeroDemand() {
*/
@Test
public void testEmptyList() {
schedulingMode.computeShares(scheds,
Resources.createResource(40));
verifyShares();
ComputeFairShares.computeShares(scheds,
Resources.createResource(40), ResourceType.MEMORY);
verifyMemoryShares();
}
/**
* Test that CPU works as well as memory
*/
@Test
public void testCPU() {
scheds.add(new FakeSchedulable(Resources.createResource(0, 20),
new ResourceWeights(2.0f)));
scheds.add(new FakeSchedulable(Resources.createResource(0, 0),
new ResourceWeights(1.0f)));
scheds.add(new FakeSchedulable(Resources.createResource(0, 5),
new ResourceWeights(1.0f)));
scheds.add(new FakeSchedulable(Resources.createResource(0, 15),
new ResourceWeights(0.5f)));
ComputeFairShares.computeShares(scheds,
Resources.createResource(0, 45), ResourceType.CPU);
verifyCPUShares(20, 5, 5, 15);
}
/**
* Check that a given list of shares have been assigned to this.scheds.
*/
private void verifyShares(double... shares) {
private void verifyMemoryShares(int... shares) {
Assert.assertEquals(scheds.size(), shares.length);
for (int i = 0; i < shares.length; i++) {
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemory(), 0.01);
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemory());
}
}
/**
* Check that a given list of shares have been assigned to this.scheds.
*/
private void verifyCPUShares(int... shares) {
Assert.assertEquals(scheds.size(), shares.length);
for (int i = 0; i < shares.length; i++) {
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores());
}
}
}

View File

@ -374,11 +374,10 @@ public void testSimpleFairShareCalculation() {
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Divided three ways - betwen the two queues and the default queue
for (FSLeafQueue p : queues) {
if (!p.getName().equals("root.default")) {
assertEquals(5120, p.getFairShare().getMemory());
}
assertEquals(3414, p.getFairShare().getMemory());
}
}
@ -393,7 +392,7 @@ public void testSimpleHierarchicalFairShareCalculation() {
scheduler.handle(nodeEvent1);
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "default", "user1");
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
@ -401,9 +400,9 @@ public void testSimpleHierarchicalFairShareCalculation() {
QueueManager queueManager = scheduler.getQueueManager();
Collection<FSLeafQueue> queues = queueManager.getLeafQueues();
assertEquals(4, queues.size());
assertEquals(3, queues.size());
FSLeafQueue queue1 = queueManager.getLeafQueue("queue1");
FSLeafQueue queue1 = queueManager.getLeafQueue("default");
FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2");
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3");
assertEquals(capacity / 2, queue1.getFairShare().getMemory());

View File

@ -64,8 +64,9 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage,
ResourceWeights weights, int minMemShare, int minCpuShare) {
Resource usage = BuilderUtils.newResource(memUsage, cpuUsage);
Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare);
return new FakeSchedulable(Resources.none(), minShare, weights,
Resources.none(), usage, 0l);
return new FakeSchedulable(minShare,
Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE),
weights, Resources.none(), usage, 0l);
}
@Test

View File

@ -257,3 +257,39 @@ Allocation file format
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
* {Administration}
The fair scheduler provides support for administration at runtime through two mechanisms:
* It is possible to modify minimum shares, limits, weights, preemption timeouts
and queue scheduling policies at runtime by editing the allocation file. The
scheduler will reload this file 10-15 seconds after it sees that it was
modified.
* Current applications, queues, and fair shares can be examined through the
ResourceManager's web interface, at
http://<ResourceManager URL>/cluster/scheduler.
The following fields can be seen for each queue on the web interface:
* Used Resources - The sum of resources allocated to containers within the queue.
* Num Active Applications - The number of applications in the queue that have
received at least one container.
* Num Pending Applications - The number of applications in the queue that have
not yet received any containers.
* Min Resources - The configured minimum resources that are guaranteed to the queue.
* Max Resources - The configured maximum resources that are allowed to the queue.
* Fair Share - The queue's fair share of resources. Queues may be allocated
resources beyond their fair share when other queues aren't using them. A
queue whose resource consumption lies at or below its fair share will never
have its containers preempted.
In addition to the information that the ResourceManager normally displays
about each application, the web interface includes the application's fair share.