From c4918cb4cb5a267a8cfd6eace28fcfe7ad6174e8 Mon Sep 17 00:00:00 2001 From: carlo curino Date: Tue, 16 Sep 2014 13:20:57 -0700 Subject: [PATCH] YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan. --- YARN-1051-CHANGES.txt | 3 + .../reservation/CapacityOverTimePolicy.java | 231 ++++++++++++++++++ .../reservation/NoOverCommitPolicy.java | 74 ++++++ .../reservation/SharingPolicy.java | 49 ++++ .../ContractValidationException.java | 9 +- .../exceptions/MismatchedUserException.java | 28 +++ .../exceptions/PlanningException.java | 9 +- .../exceptions/PlanningQuotaException.java | 28 +++ .../ResourceOverCommitException.java | 28 +++ .../CapacitySchedulerConfiguration.java | 154 ++++++++++++ .../TestCapacityOverTimePolicy.java | 222 +++++++++++++++++ .../reservation/TestNoOverCommitPolicy.java | 144 +++++++++++ 12 files changed, 977 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt index deece7c6d96..e9ec691a6e0 100644 --- a/YARN-1051-CHANGES.txt +++ b/YARN-1051-CHANGES.txt @@ -14,3 +14,6 @@ subru) YARN-1710. Logic to find allocations within a Plan that satisfy user ReservationRequest(s). (Carlo Curino and Subru Krishnan via curino) + +YARN-1711. Policy to enforce instantaneous and over-time quotas +on user reservations. (Carlo Curino and Subru Krishnan via curino) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java new file mode 100644 index 00000000000..38c0207ff37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacityOverTimePolicy.java @@ -0,0 +1,231 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Date; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This policy enforces a time-extended notion of Capacity. In particular it + * guarantees that the allocation received in input when combined with all + * previous allocation for the user does not violate an instantaneous max limit + * on the resources received, and that for every window of time of length + * validWindow, the integral of the allocations for a user (sum of the currently + * submitted allocation and all prior allocations for the user) does not exceed + * validWindow * maxAvg. + * + * This allows flexibility, in the sense that an allocation can instantaneously + * use large portions of the available capacity, but prevents abuses by bounding + * the average use over time. + * + * By controlling maxInst, maxAvg, validWindow the administrator configuring + * this policy can obtain a behavior ranging from instantaneously enforced + * capacity (akin to existing queues), or fully flexible allocations (likely + * reserved to super-users, or trusted systems). + */ +@LimitedPrivate("yarn") +@Unstable +public class CapacityOverTimePolicy implements SharingPolicy { + + private CapacitySchedulerConfiguration conf; + private long validWindow; + private float maxInst; + private float maxAvg; + + // For now this is CapacityScheduler specific, but given a hierarchy in the + // configuration structure of the schedulers (e.g., SchedulerConfiguration) + // it should be easy to remove this limitation + @Override + public void init(String reservationQueuePath, Configuration conf) { + this.conf = (CapacitySchedulerConfiguration) conf; + validWindow = this.conf.getReservationWindow(reservationQueuePath); + maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100; + maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100; + }; + + @Override + public void validate(Plan plan, ReservationAllocation reservation) + throws PlanningException { + + // this is entire method invoked under a write-lock on the plan, no need + // to synchronize accesses to the plan further + + // Try to verify whether there is already a reservation with this ID in + // the system (remove its contribution during validation to simulate a + // try-n-swap + // update). + ReservationAllocation oldReservation = + plan.getReservationById(reservation.getReservationId()); + + // sanity check that the update of a reservation is not changing username + if (oldReservation != null + && !oldReservation.getUser().equals(reservation.getUser())) { + throw new MismatchedUserException( + "Updating an existing reservation with mismatched user:" + + oldReservation.getUser() + " != " + reservation.getUser()); + } + + long startTime = reservation.getStartTime(); + long endTime = reservation.getEndTime(); + long step = plan.getStep(); + + Resource planTotalCapacity = plan.getTotalCapacity(); + + Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg); + Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst); + + // define variable that will store integral of resources (need diff class to + // avoid overflow issues for long/large allocations) + IntegralResource runningTot = new IntegralResource(0L, 0L); + IntegralResource maxAllowed = new IntegralResource(maxAvgRes); + maxAllowed.multiplyBy(validWindow / step); + + // check that the resources offered to the user during any window of length + // "validWindow" overlapping this allocation are within maxAllowed + // also enforce instantaneous and physical constraints during this pass + for (long t = startTime - validWindow; t < endTime + validWindow; t += step) { + + Resource currExistingAllocTot = plan.getTotalCommittedResources(t); + Resource currExistingAllocForUser = + plan.getConsumptionForUser(reservation.getUser(), t); + Resource currNewAlloc = reservation.getResourcesAtTime(t); + Resource currOldAlloc = Resources.none(); + if (oldReservation != null) { + currOldAlloc = oldReservation.getResourcesAtTime(t); + } + + // throw exception if the cluster is overcommitted + // tot_allocated - old + new > capacity + Resource inst = + Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc), + currOldAlloc); + if (Resources.greaterThan(plan.getResourceCalculator(), + planTotalCapacity, inst, planTotalCapacity)) { + throw new ResourceOverCommitException(" Resources at time " + t + + " would be overcommitted (" + inst + " over " + + plan.getTotalCapacity() + ") by accepting reservation: " + + reservation.getReservationId()); + } + + // throw exception if instantaneous limits are violated + // tot_alloc_to_this_user - old + new > inst_limit + if (Resources.greaterThan(plan.getResourceCalculator(), + planTotalCapacity, Resources.subtract( + Resources.add(currExistingAllocForUser, currNewAlloc), + currOldAlloc), maxInsRes)) { + throw new PlanningQuotaException("Instantaneous quota capacity " + + maxInst + " would be passed at time " + t + + " by accepting reservation: " + reservation.getReservationId()); + } + + // throw exception if the running integral of utilization over validWindow + // is violated. We perform a delta check, adding/removing instants at the + // boundary of the window from runningTot. + + // runningTot = previous_runningTot + currExistingAllocForUser + + // currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc; + + // Where: + // 1) currNewAlloc, currExistingAllocForUser represent the contribution of + // the instant in time added in this pass. + // 2) pastNewAlloc, pastOldAlloc are the contributions relative to time + // instants that are being retired from the the window + // 3) currOldAlloc is the contribution (if any) of the previous version of + // this reservation (the one we are updating) + + runningTot.add(currExistingAllocForUser); + runningTot.add(currNewAlloc); + runningTot.subtract(currOldAlloc); + + // expire contributions from instant in time before (t - validWindow) + if (t > startTime) { + Resource pastOldAlloc = + plan.getConsumptionForUser(reservation.getUser(), t - validWindow); + Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow); + + // runningTot = runningTot - pastExistingAlloc - pastNewAlloc; + runningTot.subtract(pastOldAlloc); + runningTot.subtract(pastNewAlloc); + } + + // check integral + // runningTot > maxAvg * validWindow + // NOTE: we need to use comparator of IntegralResource directly, as + // Resource and ResourceCalculator assume "int" amount of resources, + // which is not sufficient when comparing integrals (out-of-bound) + if (maxAllowed.compareTo(runningTot) < 0) { + throw new PlanningQuotaException( + "Integral (avg over time) quota capacity " + maxAvg + + " over a window of " + validWindow / 1000 + " seconds, " + + " would be passed at time " + t + "(" + new Date(t) + + ") by accepting reservation: " + + reservation.getReservationId()); + } + } + } + + @Override + public long getValidWindow() { + return validWindow; + } + + /** + * This class provides support for Resource-like book-keeping, based on + * long(s), as using Resource to store the "integral" of the allocation over + * time leads to integer overflows for large allocations/clusters. (Evolving + * Resource to use long is too disruptive at this point.) + * + * The comparison/multiplication behaviors of IntegralResource are consistent + * with the DefaultResourceCalculator. + */ + public class IntegralResource { + long memory; + long vcores; + + public IntegralResource(Resource resource) { + this.memory = resource.getMemory(); + this.vcores = resource.getVirtualCores(); + } + + public IntegralResource(long mem, long vcores) { + this.memory = mem; + this.vcores = vcores; + } + + public void add(Resource r) { + memory += r.getMemory(); + vcores += r.getVirtualCores(); + } + + public void subtract(Resource r) { + memory -= r.getMemory(); + vcores -= r.getVirtualCores(); + } + + public void multiplyBy(long window) { + memory = memory * window; + vcores = vcores * window; + } + + public long compareTo(IntegralResource other) { + long diff = memory - other.memory; + if (diff == 0) { + diff = vcores - other.vcores; + } + return diff; + } + + @Override + public String toString() { + return ""; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java new file mode 100644 index 00000000000..cbe2b78d2a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -0,0 +1,74 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This policy enforce a simple physical cluster capacity constraints, by + * validating that the allocation proposed fits in the current plan. This + * validation is compatible with "updates" and in verifying the capacity + * constraints it conceptually remove the prior version of the reservation. + */ +@LimitedPrivate("yarn") +@Unstable +public class NoOverCommitPolicy implements SharingPolicy { + + @Override + public void validate(Plan plan, ReservationAllocation reservation) + throws PlanningException { + + ReservationAllocation oldReservation = + plan.getReservationById(reservation.getReservationId()); + + // check updates are using same name + if (oldReservation != null + && !oldReservation.getUser().equals(reservation.getUser())) { + throw new MismatchedUserException( + "Updating an existing reservation with mismatching user:" + + oldReservation.getUser() + " != " + reservation.getUser()); + } + + long startTime = reservation.getStartTime(); + long endTime = reservation.getEndTime(); + long step = plan.getStep(); + + // for every instant in time, check we are respecting cluster capacity + for (long t = startTime; t < endTime; t += step) { + Resource currExistingAllocTot = plan.getTotalCommittedResources(t); + Resource currNewAlloc = reservation.getResourcesAtTime(t); + Resource currOldAlloc = Resource.newInstance(0, 0); + if (oldReservation != null) { + oldReservation.getResourcesAtTime(t); + } + // check the cluster is never over committed + // currExistingAllocTot + currNewAlloc - currOldAlloc > + // capPlan.getTotalCapacity() + if (Resources.greaterThan(plan.getResourceCalculator(), plan + .getTotalCapacity(), Resources.subtract( + Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc), + plan.getTotalCapacity())) { + throw new ResourceOverCommitException("Resources at time " + t + + " would be overcommitted by " + "accepting reservation: " + + reservation.getReservationId()); + } + } + } + + @Override + public long getValidWindow() { + // this policy has no "memory" so the valid window is set to zero + return 0; + } + + @Override + public void init(String inventoryQueuePath, Configuration conf) { + // nothing to do for this policy + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java new file mode 100644 index 00000000000..d9177643edc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SharingPolicy.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * This is the interface for policy that validate new + * {@link ReservationAllocation}s for allocations being added to a {@link Plan}. + * Individual policies will be enforcing different invariants. + */ +@LimitedPrivate("yarn") +@Unstable +public interface SharingPolicy { + + /** + * Initialize this policy + * + * @param inventoryQueuePath the name of the queue for this plan + * @param conf the system configuration + */ + public void init(String inventoryQueuePath, Configuration conf); + + /** + * This method runs the policy validation logic, and return true/false on + * whether the {@link ReservationAllocation} is acceptable according to this + * sharing policy. + * + * @param plan the {@link Plan} we validate against + * @param newAllocation the allocation proposed to be added to the + * {@link Plan} + * @throws PlanningException if the policy is respected if we add this + * {@link ReservationAllocation} to the {@link Plan} + */ + public void validate(Plan plan, ReservationAllocation newAllocation) + throws PlanningException; + + /** + * Returns the time range before and after the current reservation considered + * by this policy. In particular, this informs the archival process for the + * {@link Plan}, i.e., reservations regarding times before (now - validWindow) + * can be deleted. + * + * @return validWindow the window of validity considered by the policy. + */ + public long getValidWindow(); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java index 7ee5a76852f..cd82a9e3a1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ContractValidationException.java @@ -1,5 +1,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * This exception is thrown if the request made is not syntactically valid. + */ +@Public +@Unstable public class ContractValidationException extends PlanningException { private static final long serialVersionUID = 1L; @@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException { super(message); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java new file mode 100644 index 00000000000..0a443f3cea5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/MismatchedUserException.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Exception thrown when an update to an existing reservation is performed + * by a user that is not the reservation owner. + */ +@Public +@Unstable +public class MismatchedUserException extends PlanningException { + + private static final long serialVersionUID = 8313222590561668413L; + + public MismatchedUserException(String message) { + super(message); + } + + public MismatchedUserException(Throwable cause) { + super(cause); + } + + public MismatchedUserException(String message, Throwable cause) { + super(message, cause); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java index aa9e9fbb11f..0699856cebe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningException.java @@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + /** * Exception thrown by the admission control subsystem when there is a problem - * in trying to find an allocation for a user {@link ReservationSubmissionRequest}. + * in trying to find an allocation for a user + * {@link ReservationSubmissionRequest}. */ + +@Public +@Unstable public class PlanningException extends Exception { private static final long serialVersionUID = -684069387367879218L; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java new file mode 100644 index 00000000000..aad4ee84407 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/PlanningQuotaException.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * This exception is thrown if the user quota is exceed while accepting or + * updating a reservation. + */ +@Public +@Unstable +public class PlanningQuotaException extends PlanningException { + + private static final long serialVersionUID = 8206629288380246166L; + + public PlanningQuotaException(String message) { + super(message); + } + + public PlanningQuotaException(Throwable cause) { + super(cause); + } + + public PlanningQuotaException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java new file mode 100644 index 00000000000..a4c2b07448b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/exceptions/ResourceOverCommitException.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * This exception indicate that the reservation that has been attempted, would + * exceed the physical resources available in the {@link Plan} at the moment. + */ +@Public +@Unstable +public class ResourceOverCommitException extends PlanningException { + + private static final long serialVersionUID = 7070699407526521032L; + + public ResourceOverCommitException(String message) { + super(message); + } + + public ResourceOverCommitException(Throwable cause) { + super(cause); + } + + public ResourceOverCommitException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index af6bdc301ca..2c915c412a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -183,6 +183,63 @@ public class CapacitySchedulerConfiguration extends Configuration { } } + @Private + public static final String AVERAGE_CAPACITY = "average-capacity"; + + @Private + public static final String IS_RESERVABLE = "reservable"; + + @Private + public static final String RESERVATION_WINDOW = "reservation-window"; + + @Private + public static final String INSTANTANEOUS_MAX_CAPACITY = + "instantaneous-max-capacity"; + + @Private + public static final long DEFAULT_RESERVATION_WINDOW = 0L; + + @Private + public static final String RESERVATION_ADMISSION_POLICY = + "reservation-policy"; + + @Private + public static final String RESERVATION_AGENT_NAME = "reservation-agent"; + + @Private + public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE = + "show-reservations-as-queues"; + + @Private + public static final String DEFAULT_RESERVATION_ADMISSION_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy"; + + @Private + public static final String DEFAULT_RESERVATION_AGENT_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + + @Private + public static final String RESERVATION_PLANNER_NAME = "reservation-planner"; + + @Private + public static final String DEFAULT_RESERVATION_PLANNER_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + + @Private + public static final String RESERVATION_MOVE_ON_EXPIRY = + "reservation-move-on-expiry"; + + @Private + public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; + + @Private + public static final String RESERVATION_ENFORCEMENT_WINDOW = + "reservation-enforcement-window"; + + // default to 1h lookahead enforcement + @Private + public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000; + public CapacitySchedulerConfiguration() { this(new Configuration()); } @@ -493,4 +550,101 @@ public class CapacitySchedulerConfiguration extends Configuration { return mappings; } + + public boolean isReservable(String queue) { + boolean isReservable = + getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); + return isReservable; + } + + public void setReservable(String queue, boolean isReservable) { + setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable); + LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue) + + ", isReservableQueue=" + isReservable(queue)); + } + + public long getReservationWindow(String queue) { + long reservationWindow = + getLong(getQueuePrefix(queue) + RESERVATION_WINDOW, + DEFAULT_RESERVATION_WINDOW); + return reservationWindow; + } + + public float getAverageCapacity(String queue) { + float avgCapacity = + getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, + MAXIMUM_CAPACITY_VALUE); + return avgCapacity; + } + + public float getInstantaneousMaxCapacity(String queue) { + float instMaxCapacity = + getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, + MAXIMUM_CAPACITY_VALUE); + return instMaxCapacity; + } + + public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) { + setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY, + instMaxCapacity); + } + + public void setReservationWindow(String queue, long reservationWindow) { + setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow); + } + + public void setAverageCapacity(String queue, float avgCapacity) { + setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity); + } + + public String getReservationAdmissionPolicy(String queue) { + String reservationPolicy = + get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, + DEFAULT_RESERVATION_ADMISSION_POLICY); + return reservationPolicy; + } + + public void setReservationAdmissionPolicy(String queue, + String reservationPolicy) { + set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy); + } + + public String getReservationAgent(String queue) { + String reservationAgent = + get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, + DEFAULT_RESERVATION_AGENT_NAME); + return reservationAgent; + } + + public void setReservationAgent(String queue, String reservationPolicy) { + set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy); + } + + public boolean getShowReservationAsQueues(String queuePath) { + boolean showReservationAsQueues = + getBoolean(getQueuePrefix(queuePath) + + RESERVATION_SHOW_RESERVATION_AS_QUEUE, false); + return showReservationAsQueues; + } + + public String getReplanner(String queue) { + String replanner = + get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME, + DEFAULT_RESERVATION_PLANNER_NAME); + return replanner; + } + + public boolean getMoveOnExpiry(String queue) { + boolean killOnExpiry = + getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY, + DEFAULT_RESERVATION_MOVE_ON_EXPIRY); + return killOnExpiry; + } + + public long getEnforcementWindow(String queue) { + long enforcementWindow = + getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW, + DEFAULT_RESERVATION_ENFORCEMENT_WINDOW); + return enforcementWindow; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java new file mode 100644 index 00000000000..83d6d3f23d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -0,0 +1,222 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestCapacityOverTimePolicy { + + long timeWindow; + long step; + float avgConstraint; + float instConstraint; + long initTime; + + InMemoryPlan plan; + ReservationAgent mAgent; + Resource minAlloc; + ResourceCalculator res; + Resource maxAlloc; + + int totCont = 1000000; + + @Before + public void setup() throws Exception { + + // 24h window + timeWindow = 86400000L; + // 1 sec step + step = 1000L; + + // 25% avg cap on capacity + avgConstraint = 25; + + // 70% instantaneous cap on capacity + instConstraint = 70; + + initTime = System.currentTimeMillis(); + minAlloc = Resource.newInstance(1024, 1); + res = new DefaultResourceCalculator(); + maxAlloc = Resource.newInstance(1024 * 8, 8); + + mAgent = mock(ReservationAgent.class); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + capConf.setReservationWindow(reservationQ, timeWindow); + capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint); + capConf.setAverageCapacity(reservationQ, avgConstraint); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, capConf); + + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + "dedicated", null, true); + } + + public int[] generateData(int length, int val) { + int[] data = new int[length]; + for (int i = 0; i < length; i++) { + data[i] = val; + } + return data; + } + + @Test + public void testSimplePass() throws IOException, PlanningException { + // generate allocation that simply fit within all constraints + int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testSimplePass2() throws IOException, PlanningException { + // generate allocation from single tenant that exceed avg momentarily but + // fit within + // max instantanesou + int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testMultiTenantPass() throws IOException, PlanningException { + // generate allocation from multiple tenants that barely fit in tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 4; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } + + @Test(expected = ResourceOverCommitException.class) + public void testMultiTenantFail() throws IOException, PlanningException { + // generate allocation from multiple tenants that exceed tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 5; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } + + @Test(expected = PlanningQuotaException.class) + public void testInstFail() throws IOException, PlanningException { + // generate allocation that exceed the instantaneous cap single-show + int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + Assert.fail("should not have accepted this"); + } + + @Test + public void testInstFailBySum() throws IOException, PlanningException { + // generate allocation that exceed the instantaneous cap by sum + int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont)); + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + try { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + Assert.fail(); + } catch (PlanningQuotaException p) { + // expected + } + } + + @Test(expected = PlanningQuotaException.class) + public void testFailAvg() throws IOException, PlanningException { + // generate an allocation which violates the 25% average single-shot + Map req = + new TreeMap(); + long win = timeWindow / 2 + 100; + int cont = (int) Math.ceil(0.5 * totCont); + req.put(new ReservationInterval(initTime, initTime + win), + ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont)); + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + } + + @Test + public void testFailAvgBySum() throws IOException, PlanningException { + // generate an allocation which violates the 25% average by sum + Map req = + new TreeMap(); + long win = 86400000 / 4 + 1; + int cont = (int) Math.ceil(0.5 * totCont); + req.put(new ReservationInterval(initTime, initTime + win), + ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + + try { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + win, req, res, minAlloc))); + + Assert.fail("should not have accepted this"); + } catch (PlanningQuotaException e) { + // expected + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java new file mode 100644 index 00000000000..2ceead34022 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -0,0 +1,144 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Before; +import org.junit.Test; + +public class TestNoOverCommitPolicy { + + long step; + long initTime; + + InMemoryPlan plan; + ReservationAgent mAgent; + Resource minAlloc; + ResourceCalculator res; + Resource maxAlloc; + + int totCont = 1000000; + + @Before + public void setup() throws Exception { + + // 1 sec step + step = 1000L; + + initTime = System.currentTimeMillis(); + minAlloc = Resource.newInstance(1024, 1); + res = new DefaultResourceCalculator(); + maxAlloc = Resource.newInstance(1024 * 8, 8); + + mAgent = mock(ReservationAgent.class); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont); + String reservationQ = testUtil.getFullReservationQueueName(); + CapacitySchedulerConfiguration capConf = scheduler.getConfiguration(); + NoOverCommitPolicy policy = new NoOverCommitPolicy(); + policy.init(reservationQ, capConf); + + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), step, res, minAlloc, maxAlloc, + "dedicated", null, true); + } + + public int[] generateData(int length, int val) { + int[] data = new int[length]; + for (int i = 0; i < length; i++) { + data[i] = val; + } + return data; + } + + @Test + public void testSingleUserEasyFitPass() throws IOException, PlanningException { + // generate allocation that easily fit within resource constraints + int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont)); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test + public void testSingleUserBarelyFitPass() throws IOException, + PlanningException { + // generate allocation from single tenant that barely fit + int[] f = generateData(3600, totCont); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + + @Test(expected = ResourceOverCommitException.class) + public void testSingleFail() throws IOException, PlanningException { + // generate allocation from single tenant that exceed capacity + int[] f = generateData(3600, (int) (1.1 * totCont)); + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil + .generateAllocation(initTime, step, f), res, minAlloc)); + } + + @Test(expected = MismatchedUserException.class) + public void testUserMismatch() throws IOException, PlanningException { + // generate allocation from single tenant that exceed capacity + int[] f = generateData(3600, (int) (0.5 * totCont)); + + ReservationId rid = ReservationSystemTestUtil.getNewReservationId(); + plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1", + "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil + .generateAllocation(initTime, step, f), res, minAlloc)); + + // trying to update a reservation with a mismatching user + plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2", + "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil + .generateAllocation(initTime, step, f), res, minAlloc)); + } + + @Test + public void testMultiTenantPass() throws IOException, PlanningException { + // generate allocation from multiple tenants that barely fit in tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 4; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } + + @Test(expected = ResourceOverCommitException.class) + public void testMultiTenantFail() throws IOException, PlanningException { + // generate allocation from multiple tenants that exceed tot capacity + int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont)); + for (int i = 0; i < 5; i++) { + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u" + i, + "dedicated", initTime, initTime + f.length, + ReservationSystemTestUtil.generateAllocation(initTime, step, f), + res, minAlloc))); + } + } +}