diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index 6375c4afd9c..ab6d7f57483 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -135,14 +135,19 @@ public class DefaultResourceCalculator extends ResourceCalculator { return smaller.getMemorySize() <= bigger.getMemorySize(); } - @Override - public boolean isAnyMajorResourceZero(Resource resource) { - return resource.getMemorySize() == 0f; - } - @Override public Resource normalizeDown(Resource r, Resource stepFactor) { return Resources.createResource( roundDown((r.getMemorySize()), stepFactor.getMemorySize())); } + + @Override + public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { + return resource.getMemorySize() <= 0; + } + + @Override + public boolean isAnyMajorResourceAboveZero(Resource resource) { + return resource.getMemorySize() > 0; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 6fed23b4c2a..2e85ebca860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -576,19 +576,6 @@ public class DominantResourceCalculator extends ResourceCalculator { return true; } - @Override - public boolean isAnyMajorResourceZero(Resource resource) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation resourceInformation = resource - .getResourceInformation(i); - if (resourceInformation.getValue() == 0L) { - return true; - } - } - return false; - } - @Override public Resource normalizeDown(Resource r, Resource stepFactor) { Resource ret = Resource.newInstance(r); @@ -613,4 +600,30 @@ public class DominantResourceCalculator extends ResourceCalculator { } return ret; } + + @Override + public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation resourceInformation = resource.getResourceInformation( + i); + if (resourceInformation.getValue() <= 0L) { + return true; + } + } + return false; + } + + @Override + public boolean isAnyMajorResourceAboveZero(Resource resource) { + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation resourceInformation = resource.getResourceInformation( + i); + if (resourceInformation.getValue() > 0) { + return true; + } + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 1c42126019e..51078cd1b0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -239,12 +239,12 @@ public abstract class ResourceCalculator { /** * Check if resource has any major resource types (which are all NodeManagers - * included) a zero value. + * included) a zero value or negative value. * * @param resource resource * @return returns true if any resource is zero. */ - public abstract boolean isAnyMajorResourceZero(Resource resource); + public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource); /** * Get resource rand normalize down using step-factor @@ -257,4 +257,13 @@ public abstract class ResourceCalculator { * @return resulting normalized resource */ public abstract Resource normalizeDown(Resource r, Resource stepFactor); + + /** + * Check if resource has any major resource types (which are all NodeManagers + * included) has a >0 value. + * + * @param resource resource + * @return returns true if any resource is >0 + */ + public abstract boolean isAnyMajorResourceAboveZero(Resource resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 1c08844cf33..7826f51cd4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -547,11 +547,6 @@ public class Resources { return ret; } - public static boolean isAnyMajorResourceZero(ResourceCalculator rc, - Resource resource) { - return rc.isAnyMajorResourceZero(resource); - } - public static Resource normalizeDown(ResourceCalculator calculator, Resource resource, Resource factor) { return calculator.normalizeDown(resource, factor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 258997083db..64b36151d85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -18,12 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.PriorityQueue; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; @@ -32,6 +26,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector}. @@ -40,7 +40,8 @@ public class AbstractPreemptableResourceCalculator { protected final CapacitySchedulerPreemptionContext context; protected final ResourceCalculator rc; - private boolean isReservedPreemptionCandidatesSelector; + protected boolean isReservedPreemptionCandidatesSelector; + private Resource stepFactor; static class TQComparator implements Comparator { private ResourceCalculator rc; @@ -90,6 +91,11 @@ public class AbstractPreemptableResourceCalculator { rc = preemptionContext.getResourceCalculator(); this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; + + stepFactor = Resource.newInstance(0, 0); + for (ResourceInformation ri : stepFactor.getResources()) { + ri.setValue(1); + } } /** @@ -122,23 +128,24 @@ public class AbstractPreemptableResourceCalculator { TQComparator tqComparator = new TQComparator(rc, totGuarant); PriorityQueue orderedByNeed = new PriorityQueue<>(10, tqComparator); - for (Iterator i = qAlloc.iterator(); i.hasNext();) { + for (Iterator i = qAlloc.iterator(); i.hasNext(); ) { TempQueuePerPartition q = i.next(); Resource used = q.getUsed(); Resource initIdealAssigned; if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) { - initIdealAssigned = - Resources.add(q.getGuaranteed(), q.untouchableExtra); - } else { + initIdealAssigned = Resources.add( + Resources.componentwiseMin(q.getGuaranteed(), q.getUsed()), + q.untouchableExtra); + } else{ initIdealAssigned = Resources.clone(used); } // perform initial assignment initIdealAssignment(totGuarant, q, initIdealAssigned); - Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (allocated + used + pending), q needs more // resources, so // add it to the list of underserved queues, ordered by need. @@ -152,7 +159,6 @@ public class AbstractPreemptableResourceCalculator { // left while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant, unassigned, Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); // we compute normalizedGuarantees capacity based on currently active // queues resetCapacity(unassigned, orderedByNeed, ignoreGuarantee); @@ -166,11 +172,26 @@ public class AbstractPreemptableResourceCalculator { Collection underserved = getMostUnderservedQueues( orderedByNeed, tqComparator); + // This value will be used in every round to calculate ideal allocation. + // So make a copy to avoid it changed during calculation. + Resource dupUnassignedForTheRound = Resources.clone(unassigned); + for (Iterator i = underserved.iterator(); i .hasNext();) { + if (!rc.isAnyMajorResourceAboveZero(unassigned)) { + break; + } + TempQueuePerPartition sub = i.next(); - Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned, - sub.normalizedGuarantee, Resource.newInstance(1, 1)); + + // How much resource we offer to the queue (to increase its ideal_alloc + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, + dupUnassignedForTheRound, + sub.normalizedGuarantee, this.stepFactor); + + // Make sure it is not beyond unassigned + wQavail = Resources.componentwiseMin(wQavail, unassigned); + Resource wQidle = sub.offer(wQavail, rc, totGuarant, isReservedPreemptionCandidatesSelector); Resource wQdone = Resources.subtract(wQavail, wQidle); @@ -180,9 +201,12 @@ public class AbstractPreemptableResourceCalculator { // queue, recalculating its order based on need. orderedByNeed.add(sub); } - Resources.addTo(wQassigned, wQdone); + + Resources.subtractFrom(unassigned, wQdone); + + // Make sure unassigned is always larger than 0 + unassigned = Resources.componentwiseMax(unassigned, Resources.none()); } - Resources.subtractFrom(unassigned, wQassigned); } // Sometimes its possible that, all queues are properly served. So intra diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index f097e9c6291..5396d61a47d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -132,6 +133,16 @@ public class CapacitySchedulerPreemptionUtils { * map to hold preempted containers * @param totalPreemptionAllowed * total preemption allowed per round + * @param conservativeDRF + * should we do conservativeDRF preemption or not. + * When true: + * stop preempt container when any major resource type <= 0 for to- + * preempt. + * This is default preemption behavior of intra-queue preemption + * When false: + * stop preempt container when: all major resource type <= 0 for + * to-preempt. + * This is default preemption behavior of inter-queue preemption * @return should we preempt rmContainer. If we should, deduct from * resourceToObtainByPartition */ @@ -140,7 +151,7 @@ public class CapacitySchedulerPreemptionUtils { Map resourceToObtainByPartitions, RMContainer rmContainer, Resource clusterResource, Map> preemptMap, - Resource totalPreemptionAllowed) { + Resource totalPreemptionAllowed, boolean conservativeDRF) { ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); // We will not account resource of a container twice or more @@ -152,13 +163,49 @@ public class CapacitySchedulerPreemptionUtils { rmContainer.getAllocatedNode()); Resource toObtainByPartition = resourceToObtainByPartitions .get(nodePartition); + if (null == toObtainByPartition) { + return false; + } + + // If a toObtain resource type == 0, set it to -1 to avoid 0 resource + // type affect following doPreemption check: isAnyMajorResourceZero + for (ResourceInformation ri : toObtainByPartition.getResources()) { + if (ri.getValue() == 0) { + ri.setValue(-1); + } + } + + if (rc.isAnyMajorResourceAboveZero(toObtainByPartition) && Resources.fitsIn( + rc, rmContainer.getAllocatedResource(), totalPreemptionAllowed)) { + boolean doPreempt; + + // How much resource left after preemption happen. + Resource toObtainAfterPreemption = Resources.subtract(toObtainByPartition, + rmContainer.getAllocatedResource()); + + if (conservativeDRF) { + doPreempt = !rc.isAnyMajorResourceZeroOrNegative(toObtainByPartition); + } else { + // When we want to do more aggressive preemption, we will do preemption + // only if: + // - The preempt of the container makes positive contribution to the + // to-obtain resource. Positive contribution means any positive + // resource type decreases. + // + // This is example of positive contribution: + // * before: <30, 10, 5>, after <20, 10, -10> + // But this not positive contribution: + // * before: <30, 10, 0>, after <30, 10, -15> + doPreempt = Resources.lessThan(rc, clusterResource, + Resources + .componentwiseMax(toObtainAfterPreemption, Resources.none()), + Resources.componentwiseMax(toObtainByPartition, Resources.none())); + } + + if (!doPreempt) { + return false; + } - if (null != toObtainByPartition - && Resources.greaterThan(rc, clusterResource, toObtainByPartition, - Resources.none()) - && Resources.fitsIn(rc, rmContainer.getAllocatedResource(), - totalPreemptionAllowed) - && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) { Resources.subtractFrom(toObtainByPartition, rmContainer.getAllocatedResource()); Resources.subtractFrom(totalPreemptionAllowed, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 748548a761e..3b2fcbb90d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -111,7 +111,7 @@ public class FifoCandidatesSelector .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptionAllowed); + totalPreemptionAllowed, false); if (!preempted) { continue; } @@ -187,7 +187,7 @@ public class FifoCandidatesSelector boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, preemptMap, - totalPreemptionAllowed); + totalPreemptionAllowed, false); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -221,7 +221,7 @@ public class FifoCandidatesSelector // Try to preempt this container CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed); + selectedContainers, totalPreemptionAllowed, false); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -264,7 +264,7 @@ public class FifoCandidatesSelector // Try to preempt this container CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed); + selectedContainers, totalPreemptionAllowed, false); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 1776bd4d946..40f333fb0cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -278,8 +278,8 @@ public class FifoIntraQueuePreemptionPlugin // Once unallocated resource is 0, we can stop assigning ideal per app. if (Resources.lessThanOrEqual(rc, clusterResource, - queueReassignableResource, Resources.none()) - || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) { + queueReassignableResource, Resources.none()) || rc + .isAnyMajorResourceZeroOrNegative(queueReassignableResource)) { continue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 5b6932e6403..a91fac7bd3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -230,7 +230,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { boolean ret = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptedResourceAllowed); + totalPreemptedResourceAllowed, true); // Subtract from respective user's resource usage once a container is // selected for preemption. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 676c14fc26a..08d834ea004 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -41,8 +41,6 @@ public class PreemptableResourceCalculator private static final Log LOG = LogFactory.getLog(PreemptableResourceCalculator.class); - private boolean isReservedPreemptionCandidatesSelector; - /** * PreemptableResourceCalculator constructor * @@ -95,8 +93,8 @@ public class PreemptableResourceCalculator } // first compute the allocation as a fixpoint based on guaranteed capacity - computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned, - false); + computeFixpointAllocation(tot_guarant, new HashSet<>(nonZeroGuarQueues), + unassigned, false); // if any capacity is left unassigned, distributed among zero-guarantee // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 9d8297d5590..4214acc5524 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -151,7 +151,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { // # This is for leaf queue only. // max(guaranteed, used) - assigned} // remain = avail - accepted - Resource accepted = Resources.min(rc, clusterResource, + Resource accepted = Resources.componentwiseMin( absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail, Resources /* @@ -186,6 +186,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity { accepted = acceptedByLocality(rc, accepted); + // accept should never be < 0 + accepted = Resources.componentwiseMax(accepted, Resources.none()); + + // or more than offered + accepted = Resources.componentwiseMin(accepted, avail); + Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a8e269779dc..a9725846f08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.mockito.ArgumentMatcher; @@ -104,10 +106,32 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { EventHandler mDisp = null; ProportionalCapacityPreemptionPolicy policy = null; Resource clusterResource = null; + // Initialize resource map + Map riMap = new HashMap<>(); + + private void resetResourceInformationMap() { + // Initialize mandatory resources + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } @SuppressWarnings("unchecked") @Before public void setup() { + resetResourceInformationMap(); + org.apache.log4j.Logger.getRootLogger().setLevel( org.apache.log4j.Level.DEBUG); @@ -142,6 +166,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { partitionToResource = new HashMap<>(); nodeIdToSchedulerNodes = new HashMap<>(); nameToCSQueues = new HashMap<>(); + clusterResource = Resource.newInstance(0, 0); + } + + @After + public void cleanup() { + resetResourceInformationMap(); } public void buildEnv(String labelsConfig, String nodesConfig, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java index e9a8116f1e6..6a953cfe068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -20,44 +20,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.ResourceInformation; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestPreemptionForQueueWithPriorities extends ProportionalCapacityPreemptionPolicyMockFramework { - // Initialize resource map - private Map riMap = new HashMap<>(); - @Before public void setup() { - - // Initialize mandatory resources - ResourceInformation memory = ResourceInformation.newInstance( - ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.MEMORY_MB.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); - ResourceInformation vcores = ResourceInformation.newInstance( - ResourceInformation.VCORES.getName(), - ResourceInformation.VCORES.getUnits(), - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - riMap.put(ResourceInformation.MEMORY_URI, memory); - riMap.put(ResourceInformation.VCORES_URI, vcores); - - ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); - + rc = new DefaultResourceCalculator(); super.setup(); policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); } @@ -340,8 +321,8 @@ public class TestPreemptionForQueueWithPriorities * - a2 (capacity=60), p=1 * - b (capacity=30), p=1 * - b1 (capacity=50), p=1 - * - b1 (capacity=50), p=2 - * - c (capacity=40), p=2 + * - b2 (capacity=50), p=2 + * - c (capacity=40), p=1 * */ String labelsConfig = "=100,true"; // default partition @@ -349,11 +330,11 @@ public class TestPreemptionForQueueWithPriorities String queuesConfig = // guaranteed,max,used,pending "root(=[100 100 100 100]);" + //root - "-a(=[30 100 40 50]){priority=1};" + // a + "-a(=[29 100 40 50]){priority=1};" + // a "--a1(=[12 100 20 50]){priority=1};" + // a1 - "--a2(=[18 100 20 50]){priority=1};" + // a2 - "-b(=[30 100 59 50]){priority=1};" + // b - "--b1(=[15 100 30 50]){priority=1};" + // b1 + "--a2(=[17 100 20 50]){priority=1};" + // a2 + "-b(=[31 100 59 50]){priority=1};" + // b + "--b1(=[16 100 30 50]){priority=1};" + // b1 "--b2(=[15 100 29 50]){priority=2};" + // b2 "-c(=[40 100 1 30]){priority=1}"; // c String appsConfig = @@ -362,7 +343,7 @@ public class TestPreemptionForQueueWithPriorities "a2\t(1,1,n1,,20,false);" + // app2 in a2 "b1\t(1,1,n1,,30,false);" + // app3 in b1 "b2\t(1,1,n1,,29,false);" + // app4 in b2 - "c\t(1,1,n1,,29,false)"; // app5 in c + "c\t(1,1,n1,,1,false)"; // app5 in c buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); @@ -370,16 +351,16 @@ public class TestPreemptionForQueueWithPriorities // Preemption should first divide capacities between a / b, and b2 should // get less preemption than b1 (because b2 has higher priority) - verify(mDisp, times(5)).handle(argThat( + verify(mDisp, times(6)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); - verify(mDisp, never()).handle(argThat( + verify(mDisp, times(1)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(2)))); - verify(mDisp, times(15)).handle(argThat( + verify(mDisp, times(13)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(3)))); - verify(mDisp, times(9)).handle(argThat( + verify(mDisp, times(10)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(4)))); } @@ -426,7 +407,7 @@ public class TestPreemptionForQueueWithPriorities // Preemption should first divide capacities between a / b, and b1 should // get less preemption than b2 (because b1 has higher priority) - verify(mDisp, never()).handle(argThat( + verify(mDisp, times(3)).handle(argThat( new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); verify(mDisp, never()).handle(argThat( @@ -505,4 +486,56 @@ public class TestPreemptionForQueueWithPriorities getAppAttemptId(3)))); } + @Test + public void test3ResourceTypesInterQueuePreemption() throws IOException { + rc = new DominantResourceCalculator(); + when(cs.getResourceCalculator()).thenReturn(rc); + + // Initialize resource map + String RESOURCE_1 = "res1"; + riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0, + ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE)); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + /** + * Queue structure is: + * + *
+     *              root
+     *           /  \  \
+     *          a    b  c
+     * 
+ * A / B / C have 33.3 / 33.3 / 33.4 resources + * Total cluster resource have mem=30, cpu=18, GPU=6 + * A uses mem=6, cpu=3, GPU=3 + * B uses mem=6, cpu=3, GPU=3 + * C is asking mem=1,cpu=1,GPU=1 + * + * We expect it can preempt from one of the jobs + */ + String labelsConfig = + "=30:18:6,true;"; + String nodesConfig = + "n1= res=30:18:6;"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root + "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a + "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b + "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a1 + + "(1,2:2:1,n1,,3,false);" + + "b\t" // app2 in b2 + + "(1,2:2:1,n1,,3,false)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java index c8a1f0f70ce..14a3a9ad7d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Before; import org.junit.Test; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import java.io.IOException; + import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -41,8 +46,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF } @Test - public void testInterQueuePreemptionWithMultipleResource() - throws Exception { + public void testInterQueuePreemptionWithMultipleResource() throws Exception { /** * Queue structure is: * @@ -121,4 +125,52 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } -} + + @Test + public void test3ResourceTypesInterQueuePreemption() throws IOException { + // Initialize resource map + String RESOURCE_1 = "res1"; + riMap.put(RESOURCE_1, ResourceInformation + .newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0, + Integer.MAX_VALUE)); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + /* + * root + * / \ \ + * a b c + * + * A / B / C have 33.3 / 33.3 / 33.4 resources + * Total cluster resource have mem=30, cpu=18, GPU=6 + * A uses mem=6, cpu=3, GPU=3 + * B uses mem=6, cpu=3, GPU=3 + * C is asking mem=1,cpu=1,GPU=1 + * + * We expect it can preempt from one of the jobs + */ + String labelsConfig = "=30:18:6,true;"; + String nodesConfig = "n1= res=30:18:6;"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending + "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root + "-a(=[10:7:2 10:6:3 6:6:3 0:0:0]);" + // a + "-b(=[10:6:2 10:6:3 6:6:3 0:0:0]);" + // b + "-c(=[10:5:2 10:6:2 0:0:0 1:1:1])"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a1 + + "(1,2:2:1,n1,,3,false);" + "b\t" // app2 in b2 + + "(1,2:2:1,n1,,3,false)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } +} \ No newline at end of file