YARN-8292: Fix the dominant resource preemption cannot happen when some of the resource vector becomes negative. Contributed by Wangda Tan.
This commit is contained in:
parent
bddfe796f2
commit
8d5509c681
|
@ -135,14 +135,19 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
||||||
return smaller.getMemorySize() <= bigger.getMemorySize();
|
return smaller.getMemorySize() <= bigger.getMemorySize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAnyMajorResourceZero(Resource resource) {
|
|
||||||
return resource.getMemorySize() == 0f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource normalizeDown(Resource r, Resource stepFactor) {
|
public Resource normalizeDown(Resource r, Resource stepFactor) {
|
||||||
return Resources.createResource(
|
return Resources.createResource(
|
||||||
roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
|
roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
|
||||||
|
return resource.getMemorySize() <= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnyMajorResourceAboveZero(Resource resource) {
|
||||||
|
return resource.getMemorySize() > 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -576,19 +576,6 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAnyMajorResourceZero(Resource resource) {
|
|
||||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
|
||||||
for (int i = 0; i < maxLength; i++) {
|
|
||||||
ResourceInformation resourceInformation = resource
|
|
||||||
.getResourceInformation(i);
|
|
||||||
if (resourceInformation.getValue() == 0L) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource normalizeDown(Resource r, Resource stepFactor) {
|
public Resource normalizeDown(Resource r, Resource stepFactor) {
|
||||||
Resource ret = Resource.newInstance(r);
|
Resource ret = Resource.newInstance(r);
|
||||||
|
@ -613,4 +600,30 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
|
||||||
|
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||||
|
for (int i = 0; i < maxLength; i++) {
|
||||||
|
ResourceInformation resourceInformation = resource.getResourceInformation(
|
||||||
|
i);
|
||||||
|
if (resourceInformation.getValue() <= 0L) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnyMajorResourceAboveZero(Resource resource) {
|
||||||
|
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||||
|
for (int i = 0; i < maxLength; i++) {
|
||||||
|
ResourceInformation resourceInformation = resource.getResourceInformation(
|
||||||
|
i);
|
||||||
|
if (resourceInformation.getValue() > 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -239,12 +239,12 @@ public abstract class ResourceCalculator {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if resource has any major resource types (which are all NodeManagers
|
* Check if resource has any major resource types (which are all NodeManagers
|
||||||
* included) a zero value.
|
* included) a zero value or negative value.
|
||||||
*
|
*
|
||||||
* @param resource resource
|
* @param resource resource
|
||||||
* @return returns true if any resource is zero.
|
* @return returns true if any resource is zero.
|
||||||
*/
|
*/
|
||||||
public abstract boolean isAnyMajorResourceZero(Resource resource);
|
public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get resource <code>r</code>and normalize down using step-factor
|
* Get resource <code>r</code>and normalize down using step-factor
|
||||||
|
@ -257,4 +257,13 @@ public abstract class ResourceCalculator {
|
||||||
* @return resulting normalized resource
|
* @return resulting normalized resource
|
||||||
*/
|
*/
|
||||||
public abstract Resource normalizeDown(Resource r, Resource stepFactor);
|
public abstract Resource normalizeDown(Resource r, Resource stepFactor);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if resource has any major resource types (which are all NodeManagers
|
||||||
|
* included) has a >0 value.
|
||||||
|
*
|
||||||
|
* @param resource resource
|
||||||
|
* @return returns true if any resource is >0
|
||||||
|
*/
|
||||||
|
public abstract boolean isAnyMajorResourceAboveZero(Resource resource);
|
||||||
}
|
}
|
||||||
|
|
|
@ -547,11 +547,6 @@ public class Resources {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
|
|
||||||
Resource resource) {
|
|
||||||
return rc.isAnyMajorResourceZero(resource);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Resource normalizeDown(ResourceCalculator calculator,
|
public static Resource normalizeDown(ResourceCalculator calculator,
|
||||||
Resource resource, Resource factor) {
|
Resource resource, Resource factor) {
|
||||||
return calculator.normalizeDown(resource, factor);
|
return calculator.normalizeDown(resource, factor);
|
||||||
|
|
|
@ -18,12 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.PriorityQueue;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
||||||
|
@ -32,6 +26,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate how much resources need to be preempted for each queue,
|
* Calculate how much resources need to be preempted for each queue,
|
||||||
* will be used by {@link PreemptionCandidatesSelector}.
|
* will be used by {@link PreemptionCandidatesSelector}.
|
||||||
|
@ -40,7 +40,8 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
|
|
||||||
protected final CapacitySchedulerPreemptionContext context;
|
protected final CapacitySchedulerPreemptionContext context;
|
||||||
protected final ResourceCalculator rc;
|
protected final ResourceCalculator rc;
|
||||||
private boolean isReservedPreemptionCandidatesSelector;
|
protected boolean isReservedPreemptionCandidatesSelector;
|
||||||
|
private Resource stepFactor;
|
||||||
|
|
||||||
static class TQComparator implements Comparator<TempQueuePerPartition> {
|
static class TQComparator implements Comparator<TempQueuePerPartition> {
|
||||||
private ResourceCalculator rc;
|
private ResourceCalculator rc;
|
||||||
|
@ -90,6 +91,11 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
rc = preemptionContext.getResourceCalculator();
|
rc = preemptionContext.getResourceCalculator();
|
||||||
this.isReservedPreemptionCandidatesSelector =
|
this.isReservedPreemptionCandidatesSelector =
|
||||||
isReservedPreemptionCandidatesSelector;
|
isReservedPreemptionCandidatesSelector;
|
||||||
|
|
||||||
|
stepFactor = Resource.newInstance(0, 0);
|
||||||
|
for (ResourceInformation ri : stepFactor.getResources()) {
|
||||||
|
ri.setValue(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -122,23 +128,24 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
TQComparator tqComparator = new TQComparator(rc, totGuarant);
|
TQComparator tqComparator = new TQComparator(rc, totGuarant);
|
||||||
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
|
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
|
||||||
tqComparator);
|
tqComparator);
|
||||||
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
|
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext(); ) {
|
||||||
TempQueuePerPartition q = i.next();
|
TempQueuePerPartition q = i.next();
|
||||||
Resource used = q.getUsed();
|
Resource used = q.getUsed();
|
||||||
|
|
||||||
Resource initIdealAssigned;
|
Resource initIdealAssigned;
|
||||||
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
|
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
|
||||||
initIdealAssigned =
|
initIdealAssigned = Resources.add(
|
||||||
Resources.add(q.getGuaranteed(), q.untouchableExtra);
|
Resources.componentwiseMin(q.getGuaranteed(), q.getUsed()),
|
||||||
} else {
|
q.untouchableExtra);
|
||||||
|
} else{
|
||||||
initIdealAssigned = Resources.clone(used);
|
initIdealAssigned = Resources.clone(used);
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform initial assignment
|
// perform initial assignment
|
||||||
initIdealAssignment(totGuarant, q, initIdealAssigned);
|
initIdealAssignment(totGuarant, q, initIdealAssigned);
|
||||||
|
|
||||||
|
|
||||||
Resources.subtractFrom(unassigned, q.idealAssigned);
|
Resources.subtractFrom(unassigned, q.idealAssigned);
|
||||||
|
|
||||||
// If idealAssigned < (allocated + used + pending), q needs more
|
// If idealAssigned < (allocated + used + pending), q needs more
|
||||||
// resources, so
|
// resources, so
|
||||||
// add it to the list of underserved queues, ordered by need.
|
// add it to the list of underserved queues, ordered by need.
|
||||||
|
@ -152,7 +159,6 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
// left
|
// left
|
||||||
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
|
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
|
||||||
unassigned, Resources.none())) {
|
unassigned, Resources.none())) {
|
||||||
Resource wQassigned = Resource.newInstance(0, 0);
|
|
||||||
// we compute normalizedGuarantees capacity based on currently active
|
// we compute normalizedGuarantees capacity based on currently active
|
||||||
// queues
|
// queues
|
||||||
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
|
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
|
||||||
|
@ -166,11 +172,26 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
|
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
|
||||||
orderedByNeed, tqComparator);
|
orderedByNeed, tqComparator);
|
||||||
|
|
||||||
|
// This value will be used in every round to calculate ideal allocation.
|
||||||
|
// So make a copy to avoid it changed during calculation.
|
||||||
|
Resource dupUnassignedForTheRound = Resources.clone(unassigned);
|
||||||
|
|
||||||
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
|
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
|
||||||
.hasNext();) {
|
.hasNext();) {
|
||||||
|
if (!rc.isAnyMajorResourceAboveZero(unassigned)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
TempQueuePerPartition sub = i.next();
|
TempQueuePerPartition sub = i.next();
|
||||||
Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
|
|
||||||
sub.normalizedGuarantee, Resource.newInstance(1, 1));
|
// How much resource we offer to the queue (to increase its ideal_alloc
|
||||||
|
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
|
||||||
|
dupUnassignedForTheRound,
|
||||||
|
sub.normalizedGuarantee, this.stepFactor);
|
||||||
|
|
||||||
|
// Make sure it is not beyond unassigned
|
||||||
|
wQavail = Resources.componentwiseMin(wQavail, unassigned);
|
||||||
|
|
||||||
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
|
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
|
||||||
isReservedPreemptionCandidatesSelector);
|
isReservedPreemptionCandidatesSelector);
|
||||||
Resource wQdone = Resources.subtract(wQavail, wQidle);
|
Resource wQdone = Resources.subtract(wQavail, wQidle);
|
||||||
|
@ -180,9 +201,12 @@ public class AbstractPreemptableResourceCalculator {
|
||||||
// queue, recalculating its order based on need.
|
// queue, recalculating its order based on need.
|
||||||
orderedByNeed.add(sub);
|
orderedByNeed.add(sub);
|
||||||
}
|
}
|
||||||
Resources.addTo(wQassigned, wQdone);
|
|
||||||
|
Resources.subtractFrom(unassigned, wQdone);
|
||||||
|
|
||||||
|
// Make sure unassigned is always larger than 0
|
||||||
|
unassigned = Resources.componentwiseMax(unassigned, Resources.none());
|
||||||
}
|
}
|
||||||
Resources.subtractFrom(unassigned, wQassigned);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sometimes its possible that, all queues are properly served. So intra
|
// Sometimes its possible that, all queues are properly served. So intra
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
@ -132,6 +133,16 @@ public class CapacitySchedulerPreemptionUtils {
|
||||||
* map to hold preempted containers
|
* map to hold preempted containers
|
||||||
* @param totalPreemptionAllowed
|
* @param totalPreemptionAllowed
|
||||||
* total preemption allowed per round
|
* total preemption allowed per round
|
||||||
|
* @param conservativeDRF
|
||||||
|
* should we do conservativeDRF preemption or not.
|
||||||
|
* When true:
|
||||||
|
* stop preempt container when any major resource type <= 0 for to-
|
||||||
|
* preempt.
|
||||||
|
* This is default preemption behavior of intra-queue preemption
|
||||||
|
* When false:
|
||||||
|
* stop preempt container when: all major resource type <= 0 for
|
||||||
|
* to-preempt.
|
||||||
|
* This is default preemption behavior of inter-queue preemption
|
||||||
* @return should we preempt rmContainer. If we should, deduct from
|
* @return should we preempt rmContainer. If we should, deduct from
|
||||||
* <code>resourceToObtainByPartition</code>
|
* <code>resourceToObtainByPartition</code>
|
||||||
*/
|
*/
|
||||||
|
@ -140,7 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
|
||||||
Map<String, Resource> resourceToObtainByPartitions,
|
Map<String, Resource> resourceToObtainByPartitions,
|
||||||
RMContainer rmContainer, Resource clusterResource,
|
RMContainer rmContainer, Resource clusterResource,
|
||||||
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
|
||||||
Resource totalPreemptionAllowed) {
|
Resource totalPreemptionAllowed, boolean conservativeDRF) {
|
||||||
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
|
ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
|
||||||
|
|
||||||
// We will not account resource of a container twice or more
|
// We will not account resource of a container twice or more
|
||||||
|
@ -152,13 +163,49 @@ public class CapacitySchedulerPreemptionUtils {
|
||||||
rmContainer.getAllocatedNode());
|
rmContainer.getAllocatedNode());
|
||||||
Resource toObtainByPartition = resourceToObtainByPartitions
|
Resource toObtainByPartition = resourceToObtainByPartitions
|
||||||
.get(nodePartition);
|
.get(nodePartition);
|
||||||
|
if (null == toObtainByPartition) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a toObtain resource type == 0, set it to -1 to avoid 0 resource
|
||||||
|
// type affect following doPreemption check: isAnyMajorResourceZero
|
||||||
|
for (ResourceInformation ri : toObtainByPartition.getResources()) {
|
||||||
|
if (ri.getValue() == 0) {
|
||||||
|
ri.setValue(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rc.isAnyMajorResourceAboveZero(toObtainByPartition) && Resources.fitsIn(
|
||||||
|
rc, rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
|
||||||
|
boolean doPreempt;
|
||||||
|
|
||||||
|
// How much resource left after preemption happen.
|
||||||
|
Resource toObtainAfterPreemption = Resources.subtract(toObtainByPartition,
|
||||||
|
rmContainer.getAllocatedResource());
|
||||||
|
|
||||||
|
if (conservativeDRF) {
|
||||||
|
doPreempt = !rc.isAnyMajorResourceZeroOrNegative(toObtainByPartition);
|
||||||
|
} else {
|
||||||
|
// When we want to do more aggressive preemption, we will do preemption
|
||||||
|
// only if:
|
||||||
|
// - The preempt of the container makes positive contribution to the
|
||||||
|
// to-obtain resource. Positive contribution means any positive
|
||||||
|
// resource type decreases.
|
||||||
|
//
|
||||||
|
// This is example of positive contribution:
|
||||||
|
// * before: <30, 10, 5>, after <20, 10, -10>
|
||||||
|
// But this not positive contribution:
|
||||||
|
// * before: <30, 10, 0>, after <30, 10, -15>
|
||||||
|
doPreempt = Resources.lessThan(rc, clusterResource,
|
||||||
|
Resources
|
||||||
|
.componentwiseMax(toObtainAfterPreemption, Resources.none()),
|
||||||
|
Resources.componentwiseMax(toObtainByPartition, Resources.none()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!doPreempt) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (null != toObtainByPartition
|
|
||||||
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition,
|
|
||||||
Resources.none())
|
|
||||||
&& Resources.fitsIn(rc, rmContainer.getAllocatedResource(),
|
|
||||||
totalPreemptionAllowed)
|
|
||||||
&& !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
|
|
||||||
Resources.subtractFrom(toObtainByPartition,
|
Resources.subtractFrom(toObtainByPartition,
|
||||||
rmContainer.getAllocatedResource());
|
rmContainer.getAllocatedResource());
|
||||||
Resources.subtractFrom(totalPreemptionAllowed,
|
Resources.subtractFrom(totalPreemptionAllowed,
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class FifoCandidatesSelector
|
||||||
.tryPreemptContainerAndDeductResToObtain(rc,
|
.tryPreemptContainerAndDeductResToObtain(rc,
|
||||||
preemptionContext, resToObtainByPartition, c,
|
preemptionContext, resToObtainByPartition, c,
|
||||||
clusterResource, selectedCandidates,
|
clusterResource, selectedCandidates,
|
||||||
totalPreemptionAllowed);
|
totalPreemptionAllowed, false);
|
||||||
if (!preempted) {
|
if (!preempted) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ public class FifoCandidatesSelector
|
||||||
boolean preempted = CapacitySchedulerPreemptionUtils
|
boolean preempted = CapacitySchedulerPreemptionUtils
|
||||||
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
|
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
|
||||||
resToObtainByPartition, c, clusterResource, preemptMap,
|
resToObtainByPartition, c, clusterResource, preemptMap,
|
||||||
totalPreemptionAllowed);
|
totalPreemptionAllowed, false);
|
||||||
if (preempted) {
|
if (preempted) {
|
||||||
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
|
Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
|
||||||
}
|
}
|
||||||
|
@ -221,7 +221,7 @@ public class FifoCandidatesSelector
|
||||||
// Try to preempt this container
|
// Try to preempt this container
|
||||||
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
|
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
|
||||||
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
|
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
|
||||||
selectedContainers, totalPreemptionAllowed);
|
selectedContainers, totalPreemptionAllowed, false);
|
||||||
|
|
||||||
if (!preemptionContext.isObserveOnly()) {
|
if (!preemptionContext.isObserveOnly()) {
|
||||||
preemptionContext.getRMContext().getDispatcher().getEventHandler()
|
preemptionContext.getRMContext().getDispatcher().getEventHandler()
|
||||||
|
@ -264,7 +264,7 @@ public class FifoCandidatesSelector
|
||||||
// Try to preempt this container
|
// Try to preempt this container
|
||||||
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
|
CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
|
||||||
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
|
rc, preemptionContext, resToObtainByPartition, c, clusterResource,
|
||||||
selectedContainers, totalPreemptionAllowed);
|
selectedContainers, totalPreemptionAllowed, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,8 +278,8 @@ public class FifoIntraQueuePreemptionPlugin
|
||||||
|
|
||||||
// Once unallocated resource is 0, we can stop assigning ideal per app.
|
// Once unallocated resource is 0, we can stop assigning ideal per app.
|
||||||
if (Resources.lessThanOrEqual(rc, clusterResource,
|
if (Resources.lessThanOrEqual(rc, clusterResource,
|
||||||
queueReassignableResource, Resources.none())
|
queueReassignableResource, Resources.none()) || rc
|
||||||
|| Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
|
.isAnyMajorResourceZeroOrNegative(queueReassignableResource)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,7 +230,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
|
||||||
boolean ret = CapacitySchedulerPreemptionUtils
|
boolean ret = CapacitySchedulerPreemptionUtils
|
||||||
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
|
.tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
|
||||||
resToObtainByPartition, c, clusterResource, selectedCandidates,
|
resToObtainByPartition, c, clusterResource, selectedCandidates,
|
||||||
totalPreemptedResourceAllowed);
|
totalPreemptedResourceAllowed, true);
|
||||||
|
|
||||||
// Subtract from respective user's resource usage once a container is
|
// Subtract from respective user's resource usage once a container is
|
||||||
// selected for preemption.
|
// selected for preemption.
|
||||||
|
|
|
@ -41,8 +41,6 @@ public class PreemptableResourceCalculator
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(PreemptableResourceCalculator.class);
|
LogFactory.getLog(PreemptableResourceCalculator.class);
|
||||||
|
|
||||||
private boolean isReservedPreemptionCandidatesSelector;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* PreemptableResourceCalculator constructor
|
* PreemptableResourceCalculator constructor
|
||||||
*
|
*
|
||||||
|
@ -95,8 +93,8 @@ public class PreemptableResourceCalculator
|
||||||
}
|
}
|
||||||
|
|
||||||
// first compute the allocation as a fixpoint based on guaranteed capacity
|
// first compute the allocation as a fixpoint based on guaranteed capacity
|
||||||
computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
|
computeFixpointAllocation(tot_guarant, new HashSet<>(nonZeroGuarQueues),
|
||||||
false);
|
unassigned, false);
|
||||||
|
|
||||||
// if any capacity is left unassigned, distributed among zero-guarantee
|
// if any capacity is left unassigned, distributed among zero-guarantee
|
||||||
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
|
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
|
||||||
|
|
|
@ -151,7 +151,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||||
// # This is for leaf queue only.
|
// # This is for leaf queue only.
|
||||||
// max(guaranteed, used) - assigned}
|
// max(guaranteed, used) - assigned}
|
||||||
// remain = avail - accepted
|
// remain = avail - accepted
|
||||||
Resource accepted = Resources.min(rc, clusterResource,
|
Resource accepted = Resources.componentwiseMin(
|
||||||
absMaxCapIdealAssignedDelta,
|
absMaxCapIdealAssignedDelta,
|
||||||
Resources.min(rc, clusterResource, avail, Resources
|
Resources.min(rc, clusterResource, avail, Resources
|
||||||
/*
|
/*
|
||||||
|
@ -186,6 +186,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||||
|
|
||||||
accepted = acceptedByLocality(rc, accepted);
|
accepted = acceptedByLocality(rc, accepted);
|
||||||
|
|
||||||
|
// accept should never be < 0
|
||||||
|
accepted = Resources.componentwiseMax(accepted, Resources.none());
|
||||||
|
|
||||||
|
// or more than offered
|
||||||
|
accepted = Resources.componentwiseMin(accepted, avail);
|
||||||
|
|
||||||
Resource remain = Resources.subtract(avail, accepted);
|
Resource remain = Resources.subtract(avail, accepted);
|
||||||
Resources.addTo(idealAssigned, accepted);
|
Resources.addTo(idealAssigned, accepted);
|
||||||
return remain;
|
return remain;
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
@ -104,10 +106,32 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
EventHandler<Event> mDisp = null;
|
EventHandler<Event> mDisp = null;
|
||||||
ProportionalCapacityPreemptionPolicy policy = null;
|
ProportionalCapacityPreemptionPolicy policy = null;
|
||||||
Resource clusterResource = null;
|
Resource clusterResource = null;
|
||||||
|
// Initialize resource map
|
||||||
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
|
|
||||||
|
private void resetResourceInformationMap() {
|
||||||
|
// Initialize mandatory resources
|
||||||
|
ResourceInformation memory = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
ResourceInformation.MEMORY_MB.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||||
|
ResourceInformation.VCORES.getName(),
|
||||||
|
ResourceInformation.VCORES.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||||
|
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
resetResourceInformationMap();
|
||||||
|
|
||||||
org.apache.log4j.Logger.getRootLogger().setLevel(
|
org.apache.log4j.Logger.getRootLogger().setLevel(
|
||||||
org.apache.log4j.Level.DEBUG);
|
org.apache.log4j.Level.DEBUG);
|
||||||
|
|
||||||
|
@ -142,6 +166,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
partitionToResource = new HashMap<>();
|
partitionToResource = new HashMap<>();
|
||||||
nodeIdToSchedulerNodes = new HashMap<>();
|
nodeIdToSchedulerNodes = new HashMap<>();
|
||||||
nameToCSQueues = new HashMap<>();
|
nameToCSQueues = new HashMap<>();
|
||||||
|
clusterResource = Resource.newInstance(0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() {
|
||||||
|
resetResourceInformationMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void buildEnv(String labelsConfig, String nodesConfig,
|
public void buildEnv(String labelsConfig, String nodesConfig,
|
||||||
|
|
|
@ -20,44 +20,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestPreemptionForQueueWithPriorities
|
public class TestPreemptionForQueueWithPriorities
|
||||||
extends ProportionalCapacityPreemptionPolicyMockFramework {
|
extends ProportionalCapacityPreemptionPolicyMockFramework {
|
||||||
// Initialize resource map
|
|
||||||
private Map<String, ResourceInformation> riMap = new HashMap<>();
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
|
rc = new DefaultResourceCalculator();
|
||||||
// Initialize mandatory resources
|
|
||||||
ResourceInformation memory = ResourceInformation.newInstance(
|
|
||||||
ResourceInformation.MEMORY_MB.getName(),
|
|
||||||
ResourceInformation.MEMORY_MB.getUnits(),
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
||||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
|
||||||
ResourceInformation.VCORES.getName(),
|
|
||||||
ResourceInformation.VCORES.getUnits(),
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
||||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
|
||||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
|
||||||
|
|
||||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
||||||
|
|
||||||
super.setup();
|
super.setup();
|
||||||
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
|
||||||
}
|
}
|
||||||
|
@ -340,8 +321,8 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
* - a2 (capacity=60), p=1
|
* - a2 (capacity=60), p=1
|
||||||
* - b (capacity=30), p=1
|
* - b (capacity=30), p=1
|
||||||
* - b1 (capacity=50), p=1
|
* - b1 (capacity=50), p=1
|
||||||
* - b1 (capacity=50), p=2
|
* - b2 (capacity=50), p=2
|
||||||
* - c (capacity=40), p=2
|
* - c (capacity=40), p=1
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
String labelsConfig = "=100,true"; // default partition
|
String labelsConfig = "=100,true"; // default partition
|
||||||
|
@ -349,11 +330,11 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
String queuesConfig =
|
String queuesConfig =
|
||||||
// guaranteed,max,used,pending
|
// guaranteed,max,used,pending
|
||||||
"root(=[100 100 100 100]);" + //root
|
"root(=[100 100 100 100]);" + //root
|
||||||
"-a(=[30 100 40 50]){priority=1};" + // a
|
"-a(=[29 100 40 50]){priority=1};" + // a
|
||||||
"--a1(=[12 100 20 50]){priority=1};" + // a1
|
"--a1(=[12 100 20 50]){priority=1};" + // a1
|
||||||
"--a2(=[18 100 20 50]){priority=1};" + // a2
|
"--a2(=[17 100 20 50]){priority=1};" + // a2
|
||||||
"-b(=[30 100 59 50]){priority=1};" + // b
|
"-b(=[31 100 59 50]){priority=1};" + // b
|
||||||
"--b1(=[15 100 30 50]){priority=1};" + // b1
|
"--b1(=[16 100 30 50]){priority=1};" + // b1
|
||||||
"--b2(=[15 100 29 50]){priority=2};" + // b2
|
"--b2(=[15 100 29 50]){priority=2};" + // b2
|
||||||
"-c(=[40 100 1 30]){priority=1}"; // c
|
"-c(=[40 100 1 30]){priority=1}"; // c
|
||||||
String appsConfig =
|
String appsConfig =
|
||||||
|
@ -362,7 +343,7 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
"a2\t(1,1,n1,,20,false);" + // app2 in a2
|
"a2\t(1,1,n1,,20,false);" + // app2 in a2
|
||||||
"b1\t(1,1,n1,,30,false);" + // app3 in b1
|
"b1\t(1,1,n1,,30,false);" + // app3 in b1
|
||||||
"b2\t(1,1,n1,,29,false);" + // app4 in b2
|
"b2\t(1,1,n1,,29,false);" + // app4 in b2
|
||||||
"c\t(1,1,n1,,29,false)"; // app5 in c
|
"c\t(1,1,n1,,1,false)"; // app5 in c
|
||||||
|
|
||||||
|
|
||||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
@ -370,16 +351,16 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
|
|
||||||
// Preemption should first divide capacities between a / b, and b2 should
|
// Preemption should first divide capacities between a / b, and b2 should
|
||||||
// get less preemption than b1 (because b2 has higher priority)
|
// get less preemption than b1 (because b2 has higher priority)
|
||||||
verify(mDisp, times(5)).handle(argThat(
|
verify(mDisp, times(6)).handle(argThat(
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(1))));
|
getAppAttemptId(1))));
|
||||||
verify(mDisp, never()).handle(argThat(
|
verify(mDisp, times(1)).handle(argThat(
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(2))));
|
getAppAttemptId(2))));
|
||||||
verify(mDisp, times(15)).handle(argThat(
|
verify(mDisp, times(13)).handle(argThat(
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(3))));
|
getAppAttemptId(3))));
|
||||||
verify(mDisp, times(9)).handle(argThat(
|
verify(mDisp, times(10)).handle(argThat(
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(4))));
|
getAppAttemptId(4))));
|
||||||
}
|
}
|
||||||
|
@ -426,7 +407,7 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
|
|
||||||
// Preemption should first divide capacities between a / b, and b1 should
|
// Preemption should first divide capacities between a / b, and b1 should
|
||||||
// get less preemption than b2 (because b1 has higher priority)
|
// get less preemption than b2 (because b1 has higher priority)
|
||||||
verify(mDisp, never()).handle(argThat(
|
verify(mDisp, times(3)).handle(argThat(
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(1))));
|
getAppAttemptId(1))));
|
||||||
verify(mDisp, never()).handle(argThat(
|
verify(mDisp, never()).handle(argThat(
|
||||||
|
@ -505,4 +486,56 @@ public class TestPreemptionForQueueWithPriorities
|
||||||
getAppAttemptId(3))));
|
getAppAttemptId(3))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test3ResourceTypesInterQueuePreemption() throws IOException {
|
||||||
|
rc = new DominantResourceCalculator();
|
||||||
|
when(cs.getResourceCalculator()).thenReturn(rc);
|
||||||
|
|
||||||
|
// Initialize resource map
|
||||||
|
String RESOURCE_1 = "res1";
|
||||||
|
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||||
|
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue structure is:
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* root
|
||||||
|
* / \ \
|
||||||
|
* a b c
|
||||||
|
* </pre>
|
||||||
|
* A / B / C have 33.3 / 33.3 / 33.4 resources
|
||||||
|
* Total cluster resource have mem=30, cpu=18, GPU=6
|
||||||
|
* A uses mem=6, cpu=3, GPU=3
|
||||||
|
* B uses mem=6, cpu=3, GPU=3
|
||||||
|
* C is asking mem=1,cpu=1,GPU=1
|
||||||
|
*
|
||||||
|
* We expect it can preempt from one of the jobs
|
||||||
|
*/
|
||||||
|
String labelsConfig =
|
||||||
|
"=30:18:6,true;";
|
||||||
|
String nodesConfig =
|
||||||
|
"n1= res=30:18:6;"; // n1 is default partition
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending
|
||||||
|
"root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root
|
||||||
|
"-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a
|
||||||
|
"-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b
|
||||||
|
"-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c
|
||||||
|
String appsConfig=
|
||||||
|
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||||
|
"a\t" // app1 in a1
|
||||||
|
+ "(1,2:2:1,n1,,3,false);" +
|
||||||
|
"b\t" // app2 in b2
|
||||||
|
+ "(1,2:2:1,n1,,3,false)";
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(1)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import java.io.IOException;
|
||||||
|
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -41,8 +46,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInterQueuePreemptionWithMultipleResource()
|
public void testInterQueuePreemptionWithMultipleResource() throws Exception {
|
||||||
throws Exception {
|
|
||||||
/**
|
/**
|
||||||
* Queue structure is:
|
* Queue structure is:
|
||||||
*
|
*
|
||||||
|
@ -121,4 +125,52 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
|
||||||
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
getAppAttemptId(1))));
|
getAppAttemptId(1))));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void test3ResourceTypesInterQueuePreemption() throws IOException {
|
||||||
|
// Initialize resource map
|
||||||
|
String RESOURCE_1 = "res1";
|
||||||
|
riMap.put(RESOURCE_1, ResourceInformation
|
||||||
|
.newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||||
|
Integer.MAX_VALUE));
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* root
|
||||||
|
* / \ \
|
||||||
|
* a b c
|
||||||
|
*
|
||||||
|
* A / B / C have 33.3 / 33.3 / 33.4 resources
|
||||||
|
* Total cluster resource have mem=30, cpu=18, GPU=6
|
||||||
|
* A uses mem=6, cpu=3, GPU=3
|
||||||
|
* B uses mem=6, cpu=3, GPU=3
|
||||||
|
* C is asking mem=1,cpu=1,GPU=1
|
||||||
|
*
|
||||||
|
* We expect it can preempt from one of the jobs
|
||||||
|
*/
|
||||||
|
String labelsConfig = "=30:18:6,true;";
|
||||||
|
String nodesConfig = "n1= res=30:18:6;"; // n1 is default partition
|
||||||
|
String queuesConfig =
|
||||||
|
// guaranteed,max,used,pending
|
||||||
|
"root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root
|
||||||
|
"-a(=[10:7:2 10:6:3 6:6:3 0:0:0]);" + // a
|
||||||
|
"-b(=[10:6:2 10:6:3 6:6:3 0:0:0]);" + // b
|
||||||
|
"-c(=[10:5:2 10:6:2 0:0:0 1:1:1])"; // c
|
||||||
|
String appsConfig =
|
||||||
|
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||||
|
"a\t" // app1 in a1
|
||||||
|
+ "(1,2:2:1,n1,,3,false);" + "b\t" // app2 in b2
|
||||||
|
+ "(1,2:2:1,n1,,3,false)";
|
||||||
|
|
||||||
|
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||||
|
policy.editSchedule();
|
||||||
|
|
||||||
|
verify(mDisp, times(0)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(1))));
|
||||||
|
verify(mDisp, times(1)).handle(argThat(
|
||||||
|
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
|
||||||
|
getAppAttemptId(2))));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue