YARN-6964. Fair scheduler misuses Resources operations. (Daniel Templeton and Szilard Nemeth via Haibo Chen)
This commit is contained in:
parent
5c19ee3994
commit
8a6bb8409c
|
@ -242,7 +242,7 @@ public class Resources {
|
|||
public static boolean isNone(Resource other) {
|
||||
return NONE.equals(other);
|
||||
}
|
||||
|
||||
|
||||
public static Resource unbounded() {
|
||||
return UNBOUNDED;
|
||||
}
|
||||
|
@ -300,8 +300,9 @@ public class Resources {
|
|||
}
|
||||
|
||||
/**
|
||||
* Subtract <code>rhs</code> from <code>lhs</code> and reset any negative
|
||||
* values to zero.
|
||||
* Subtract {@code rhs} from {@code lhs} and reset any negative values to
|
||||
* zero. This call will modify {@code lhs}.
|
||||
*
|
||||
* @param lhs {@link Resource} to subtract from
|
||||
* @param rhs {@link Resource} to subtract
|
||||
* @return the value of lhs after subtraction
|
||||
|
@ -317,6 +318,19 @@ public class Resources {
|
|||
return lhs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subtract {@code rhs} from {@code lhs} and reset any negative values to
|
||||
* zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
|
||||
* unmodified.
|
||||
*
|
||||
* @param lhs {@link Resource} to subtract from
|
||||
* @param rhs {@link Resource} to subtract
|
||||
* @return the value of lhs after subtraction
|
||||
*/
|
||||
public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
|
||||
return subtractFromNonNegative(clone(lhs), rhs);
|
||||
}
|
||||
|
||||
public static Resource negate(Resource resource) {
|
||||
return subtract(NONE, resource);
|
||||
}
|
||||
|
|
|
@ -1280,8 +1280,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
}
|
||||
|
||||
@Private
|
||||
public boolean hasPendingResourceRequest(ResourceCalculator rc,
|
||||
String nodePartition, Resource cluster,
|
||||
public boolean hasPendingResourceRequest(String nodePartition,
|
||||
SchedulingMode schedulingMode) {
|
||||
// We need to consider unconfirmed allocations
|
||||
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||
|
@ -1294,16 +1293,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
|||
// To avoid too many allocation-proposals rejected for non-default
|
||||
// partition allocation
|
||||
if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) {
|
||||
pending = Resources.subtract(pending, Resources
|
||||
pending = Resources.subtractNonNegative(pending, Resources
|
||||
.createResource(unconfirmedAllocatedMem.get(),
|
||||
unconfirmedAllocatedVcores.get()));
|
||||
}
|
||||
|
||||
if (Resources.greaterThan(rc, cluster, pending, Resources.none())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return !Resources.isNone(pending);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -855,8 +855,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
|
||||
if (reservedContainer == null) {
|
||||
// Check if application needs more resource, skip if it doesn't need more.
|
||||
if (!application.hasPendingResourceRequest(rc,
|
||||
candidates.getPartition(), clusterResource, schedulingMode)) {
|
||||
if (!application.hasPendingResourceRequest(candidates.getPartition(),
|
||||
schedulingMode)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
|
||||
+ ", because it doesn't need more resource, schedulingMode="
|
||||
|
|
|
@ -257,7 +257,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the maximum resource allocation for the given queue. If the max in not
|
||||
* Get the maximum resource allocation for the given queue. If the max is not
|
||||
* set, return the default max.
|
||||
*
|
||||
* @param queue the target queue's name
|
||||
|
|
|
@ -1094,8 +1094,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|||
(!hasRequestForRack || appSchedulingInfo.canDelayTo(key,
|
||||
node.getRackName()) || (hasRequestForNode)) &&
|
||||
// The requested container must be able to fit on the node:
|
||||
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
|
||||
resource,
|
||||
Resources.fitsIn(resource,
|
||||
node.getRMNode().getTotalCapability()))) {
|
||||
ret = false;
|
||||
} else if (!getQueue().fitsInMaxShare(resource)) {
|
||||
|
|
|
@ -554,13 +554,14 @@ public class FSLeafQueue extends FSQueue {
|
|||
*/
|
||||
private Resource minShareStarvation() {
|
||||
// If demand < minshare, we should use demand to determine starvation
|
||||
Resource desiredShare = Resources.min(policy.getResourceCalculator(),
|
||||
scheduler.getClusterResource(), getMinShare(), getDemand());
|
||||
Resource starvation =
|
||||
Resources.componentwiseMin(getMinShare(), getDemand());
|
||||
|
||||
Resources.subtractFromNonNegative(starvation, getResourceUsage());
|
||||
|
||||
Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
|
||||
boolean starved = !Resources.isNone(starvation);
|
||||
|
||||
long now = scheduler.getClock().getTime();
|
||||
|
||||
if (!starved) {
|
||||
// Record that the queue is not starved
|
||||
setLastTimeAtMinShare(now);
|
||||
|
|
|
@ -83,17 +83,18 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
private static class FairShareComparator implements Comparator<Schedulable>,
|
||||
Serializable {
|
||||
private static final long serialVersionUID = 5564969375856699313L;
|
||||
private static final Resource ONE = Resources.createResource(1);
|
||||
|
||||
@Override
|
||||
public int compare(Schedulable s1, Schedulable s2) {
|
||||
int res = compareDemand(s1, s2);
|
||||
|
||||
// Pre-compute resource usages to avoid duplicate calculation
|
||||
Resource resourceUsage1 = s1.getResourceUsage();
|
||||
Resource resourceUsage2 = s2.getResourceUsage();
|
||||
// Share resource usages to avoid duplicate calculation
|
||||
Resource resourceUsage1 = null;
|
||||
Resource resourceUsage2 = null;
|
||||
|
||||
if (res == 0) {
|
||||
resourceUsage1 = s1.getResourceUsage();
|
||||
resourceUsage2 = s2.getResourceUsage();
|
||||
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
|
||||
}
|
||||
|
||||
|
@ -116,41 +117,44 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
|
||||
private int compareDemand(Schedulable s1, Schedulable s2) {
|
||||
int res = 0;
|
||||
Resource demand1 = s1.getDemand();
|
||||
Resource demand2 = s2.getDemand();
|
||||
if (demand1.equals(Resources.none()) && Resources.greaterThan(
|
||||
RESOURCE_CALCULATOR, null, demand2, Resources.none())) {
|
||||
long demand1 = s1.getDemand().getMemorySize();
|
||||
long demand2 = s2.getDemand().getMemorySize();
|
||||
|
||||
if ((demand1 == 0) && (demand2 > 0)) {
|
||||
res = 1;
|
||||
} else if (demand2.equals(Resources.none()) && Resources.greaterThan(
|
||||
RESOURCE_CALCULATOR, null, demand1, Resources.none())) {
|
||||
} else if ((demand2 == 0) && (demand1 > 0)) {
|
||||
res = -1;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
private int compareMinShareUsage(Schedulable s1, Schedulable s2,
|
||||
Resource resourceUsage1, Resource resourceUsage2) {
|
||||
int res;
|
||||
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||
s1.getMinShare(), s1.getDemand());
|
||||
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||
s2.getMinShare(), s2.getDemand());
|
||||
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||
resourceUsage1, minShare1);
|
||||
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
|
||||
resourceUsage2, minShare2);
|
||||
long minShare1 = Math.min(s1.getMinShare().getMemorySize(),
|
||||
s1.getDemand().getMemorySize());
|
||||
long minShare2 = Math.min(s2.getMinShare().getMemorySize(),
|
||||
s2.getDemand().getMemorySize());
|
||||
boolean s1Needy = resourceUsage1.getMemorySize() < minShare1;
|
||||
boolean s2Needy = resourceUsage2.getMemorySize() < minShare2;
|
||||
|
||||
if (s1Needy && !s2Needy) {
|
||||
res = -1;
|
||||
} else if (s2Needy && !s1Needy) {
|
||||
res = 1;
|
||||
} else if (s1Needy && s2Needy) {
|
||||
double minShareRatio1 = (double) resourceUsage1.getMemorySize() /
|
||||
Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)
|
||||
.getMemorySize();
|
||||
double minShareRatio2 = (double) resourceUsage2.getMemorySize() /
|
||||
Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)
|
||||
.getMemorySize();
|
||||
double minShareRatio1 = (double) resourceUsage1.getMemorySize();
|
||||
double minShareRatio2 = (double) resourceUsage2.getMemorySize();
|
||||
|
||||
if (minShare1 > 1) {
|
||||
minShareRatio1 /= minShare1;
|
||||
}
|
||||
|
||||
if (minShare2 > 1) {
|
||||
minShareRatio2 /= minShare2;
|
||||
}
|
||||
|
||||
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
|
||||
} else {
|
||||
res = 0;
|
||||
|
@ -173,18 +177,16 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
if (weight1 > 0.0 && weight2 > 0.0) {
|
||||
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
|
||||
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
|
||||
} else { // Either weight1 or weight2 equals to 0
|
||||
if (weight1 == weight2) {
|
||||
// If they have same weight, just compare usage
|
||||
useToWeightRatio1 = resourceUsage1.getMemorySize();
|
||||
useToWeightRatio2 = resourceUsage2.getMemorySize();
|
||||
} else {
|
||||
// By setting useToWeightRatios to negative weights, we give the
|
||||
// zero-weight one less priority, so the non-zero weight one will
|
||||
// be given slots.
|
||||
useToWeightRatio1 = -weight1;
|
||||
useToWeightRatio2 = -weight2;
|
||||
}
|
||||
} else if (weight1 == weight2) { // Either weight1 or weight2 equals to 0
|
||||
// If they have same weight, just compare usage
|
||||
useToWeightRatio1 = resourceUsage1.getMemorySize();
|
||||
useToWeightRatio2 = resourceUsage2.getMemorySize();
|
||||
} else {
|
||||
// By setting useToWeightRatios to negative weights, we give the
|
||||
// zero-weight one less priority, so the non-zero weight one will
|
||||
// be given slots.
|
||||
useToWeightRatio1 = -weight1;
|
||||
useToWeightRatio2 = -weight2;
|
||||
}
|
||||
|
||||
return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
|
||||
|
@ -226,7 +228,7 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
|
||||
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
|
||||
return usage.getMemorySize() > fairShare.getMemorySize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -24,6 +25,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -41,10 +43,13 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSchedulerApplicationAttempt {
|
||||
|
@ -335,4 +340,57 @@ public class TestSchedulerApplicationAttempt {
|
|||
assertEquals(Integer.MAX_VALUE,
|
||||
app.getSchedulingOpportunities(schedulerKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHasPendingResourceRequest() throws Exception {
|
||||
ApplicationAttemptId attemptId = createAppAttemptId(0, 0);
|
||||
Queue queue = createQueue("test", null);
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
when(rmContext.getEpoch()).thenReturn(3L);
|
||||
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
|
||||
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
|
||||
|
||||
Priority priority = Priority.newInstance(1);
|
||||
List<ResourceRequest> requests = new ArrayList<>(2);
|
||||
Resource unit = Resource.newInstance(1L, 1);
|
||||
|
||||
// Add a request for a container with a node label
|
||||
requests.add(ResourceRequest.newInstance(priority, ResourceRequest.ANY,
|
||||
unit, 1, false, "label1"));
|
||||
// Add a request for a container without a node label
|
||||
requests.add(ResourceRequest.newInstance(priority, ResourceRequest.ANY,
|
||||
unit, 1, false, ""));
|
||||
|
||||
// Add unique allocation IDs so that the requests aren't considered
|
||||
// duplicates
|
||||
requests.get(0).setAllocationRequestId(0L);
|
||||
requests.get(1).setAllocationRequestId(1L);
|
||||
app.updateResourceRequests(requests);
|
||||
|
||||
assertTrue("Reported no pending resource requests for no label when "
|
||||
+ "resource requests for no label are pending (exclusive partitions)",
|
||||
app.hasPendingResourceRequest("",
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
|
||||
assertTrue("Reported no pending resource requests for label with pending "
|
||||
+ "resource requests (exclusive partitions)",
|
||||
app.hasPendingResourceRequest("label1",
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
|
||||
assertFalse("Reported pending resource requests for label with no pending "
|
||||
+ "resource requests (exclusive partitions)",
|
||||
app.hasPendingResourceRequest("label2",
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
|
||||
|
||||
assertTrue("Reported no pending resource requests for no label when "
|
||||
+ "resource requests for no label are pending (relaxed partitions)",
|
||||
app.hasPendingResourceRequest("",
|
||||
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
|
||||
assertTrue("Reported no pending resource requests for label with pending "
|
||||
+ "resource requests (relaxed partitions)",
|
||||
app.hasPendingResourceRequest("label1",
|
||||
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
|
||||
assertTrue("Reported no pending resource requests for label with no "
|
||||
+ "pending resource requests (relaxed partitions)",
|
||||
app.hasPendingResourceRequest("label2",
|
||||
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,11 +28,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
*/
|
||||
public class FakeSchedulable implements Schedulable {
|
||||
private Resource usage;
|
||||
private Resource minShare;
|
||||
private Resource maxShare;
|
||||
private Resource fairShare;
|
||||
private final Resource demand;
|
||||
private final Resource minShare;
|
||||
private final Resource maxShare;
|
||||
private float weights;
|
||||
private Priority priority;
|
||||
private final Priority priority;
|
||||
private Resource fairShare;
|
||||
private long startTime;
|
||||
|
||||
public FakeSchedulable() {
|
||||
|
@ -75,10 +76,11 @@ public class FakeSchedulable implements Schedulable {
|
|||
this.minShare = minShare;
|
||||
this.maxShare = maxShare;
|
||||
this.weights = weight;
|
||||
setFairShare(fairShare);
|
||||
this.usage = usage;
|
||||
this.demand = Resources.multiply(usage, 2.0);
|
||||
this.priority = Records.newRecord(Priority.class);
|
||||
this.startTime = startTime;
|
||||
setFairShare(fairShare);
|
||||
start(startTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,13 +94,13 @@ public class FakeSchedulable implements Schedulable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setFairShare(Resource fairShare) {
|
||||
public final void setFairShare(Resource fairShare) {
|
||||
this.fairShare = fairShare;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getDemand() {
|
||||
return null;
|
||||
return demand;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,4 +149,8 @@ public class FakeSchedulable implements Schedulable {
|
|||
public void setResourceUsage(Resource usage) {
|
||||
this.usage = usage;
|
||||
}
|
||||
|
||||
public final void start(long time) {
|
||||
startTime = time;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue