YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan.
This commit is contained in:
parent
aef7928899
commit
c4918cb4cb
|
@ -14,3 +14,6 @@ subru)
|
||||||
YARN-1710. Logic to find allocations within a Plan that satisfy
|
YARN-1710. Logic to find allocations within a Plan that satisfy
|
||||||
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
|
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
|
||||||
curino)
|
curino)
|
||||||
|
|
||||||
|
YARN-1711. Policy to enforce instantaneous and over-time quotas
|
||||||
|
on user reservations. (Carlo Curino and Subru Krishnan via curino)
|
||||||
|
|
|
@ -0,0 +1,231 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This policy enforces a time-extended notion of Capacity. In particular it
|
||||||
|
* guarantees that the allocation received in input when combined with all
|
||||||
|
* previous allocation for the user does not violate an instantaneous max limit
|
||||||
|
* on the resources received, and that for every window of time of length
|
||||||
|
* validWindow, the integral of the allocations for a user (sum of the currently
|
||||||
|
* submitted allocation and all prior allocations for the user) does not exceed
|
||||||
|
* validWindow * maxAvg.
|
||||||
|
*
|
||||||
|
* This allows flexibility, in the sense that an allocation can instantaneously
|
||||||
|
* use large portions of the available capacity, but prevents abuses by bounding
|
||||||
|
* the average use over time.
|
||||||
|
*
|
||||||
|
* By controlling maxInst, maxAvg, validWindow the administrator configuring
|
||||||
|
* this policy can obtain a behavior ranging from instantaneously enforced
|
||||||
|
* capacity (akin to existing queues), or fully flexible allocations (likely
|
||||||
|
* reserved to super-users, or trusted systems).
|
||||||
|
*/
|
||||||
|
@LimitedPrivate("yarn")
|
||||||
|
@Unstable
|
||||||
|
public class CapacityOverTimePolicy implements SharingPolicy {
|
||||||
|
|
||||||
|
private CapacitySchedulerConfiguration conf;
|
||||||
|
private long validWindow;
|
||||||
|
private float maxInst;
|
||||||
|
private float maxAvg;
|
||||||
|
|
||||||
|
// For now this is CapacityScheduler specific, but given a hierarchy in the
|
||||||
|
// configuration structure of the schedulers (e.g., SchedulerConfiguration)
|
||||||
|
// it should be easy to remove this limitation
|
||||||
|
@Override
|
||||||
|
public void init(String reservationQueuePath, Configuration conf) {
|
||||||
|
this.conf = (CapacitySchedulerConfiguration) conf;
|
||||||
|
validWindow = this.conf.getReservationWindow(reservationQueuePath);
|
||||||
|
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
|
||||||
|
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(Plan plan, ReservationAllocation reservation)
|
||||||
|
throws PlanningException {
|
||||||
|
|
||||||
|
// this is entire method invoked under a write-lock on the plan, no need
|
||||||
|
// to synchronize accesses to the plan further
|
||||||
|
|
||||||
|
// Try to verify whether there is already a reservation with this ID in
|
||||||
|
// the system (remove its contribution during validation to simulate a
|
||||||
|
// try-n-swap
|
||||||
|
// update).
|
||||||
|
ReservationAllocation oldReservation =
|
||||||
|
plan.getReservationById(reservation.getReservationId());
|
||||||
|
|
||||||
|
// sanity check that the update of a reservation is not changing username
|
||||||
|
if (oldReservation != null
|
||||||
|
&& !oldReservation.getUser().equals(reservation.getUser())) {
|
||||||
|
throw new MismatchedUserException(
|
||||||
|
"Updating an existing reservation with mismatched user:"
|
||||||
|
+ oldReservation.getUser() + " != " + reservation.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
long startTime = reservation.getStartTime();
|
||||||
|
long endTime = reservation.getEndTime();
|
||||||
|
long step = plan.getStep();
|
||||||
|
|
||||||
|
Resource planTotalCapacity = plan.getTotalCapacity();
|
||||||
|
|
||||||
|
Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
|
||||||
|
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
|
||||||
|
|
||||||
|
// define variable that will store integral of resources (need diff class to
|
||||||
|
// avoid overflow issues for long/large allocations)
|
||||||
|
IntegralResource runningTot = new IntegralResource(0L, 0L);
|
||||||
|
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
|
||||||
|
maxAllowed.multiplyBy(validWindow / step);
|
||||||
|
|
||||||
|
// check that the resources offered to the user during any window of length
|
||||||
|
// "validWindow" overlapping this allocation are within maxAllowed
|
||||||
|
// also enforce instantaneous and physical constraints during this pass
|
||||||
|
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
|
||||||
|
|
||||||
|
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
|
||||||
|
Resource currExistingAllocForUser =
|
||||||
|
plan.getConsumptionForUser(reservation.getUser(), t);
|
||||||
|
Resource currNewAlloc = reservation.getResourcesAtTime(t);
|
||||||
|
Resource currOldAlloc = Resources.none();
|
||||||
|
if (oldReservation != null) {
|
||||||
|
currOldAlloc = oldReservation.getResourcesAtTime(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw exception if the cluster is overcommitted
|
||||||
|
// tot_allocated - old + new > capacity
|
||||||
|
Resource inst =
|
||||||
|
Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
|
||||||
|
currOldAlloc);
|
||||||
|
if (Resources.greaterThan(plan.getResourceCalculator(),
|
||||||
|
planTotalCapacity, inst, planTotalCapacity)) {
|
||||||
|
throw new ResourceOverCommitException(" Resources at time " + t
|
||||||
|
+ " would be overcommitted (" + inst + " over "
|
||||||
|
+ plan.getTotalCapacity() + ") by accepting reservation: "
|
||||||
|
+ reservation.getReservationId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw exception if instantaneous limits are violated
|
||||||
|
// tot_alloc_to_this_user - old + new > inst_limit
|
||||||
|
if (Resources.greaterThan(plan.getResourceCalculator(),
|
||||||
|
planTotalCapacity, Resources.subtract(
|
||||||
|
Resources.add(currExistingAllocForUser, currNewAlloc),
|
||||||
|
currOldAlloc), maxInsRes)) {
|
||||||
|
throw new PlanningQuotaException("Instantaneous quota capacity "
|
||||||
|
+ maxInst + " would be passed at time " + t
|
||||||
|
+ " by accepting reservation: " + reservation.getReservationId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw exception if the running integral of utilization over validWindow
|
||||||
|
// is violated. We perform a delta check, adding/removing instants at the
|
||||||
|
// boundary of the window from runningTot.
|
||||||
|
|
||||||
|
// runningTot = previous_runningTot + currExistingAllocForUser +
|
||||||
|
// currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
|
||||||
|
|
||||||
|
// Where:
|
||||||
|
// 1) currNewAlloc, currExistingAllocForUser represent the contribution of
|
||||||
|
// the instant in time added in this pass.
|
||||||
|
// 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
|
||||||
|
// instants that are being retired from the the window
|
||||||
|
// 3) currOldAlloc is the contribution (if any) of the previous version of
|
||||||
|
// this reservation (the one we are updating)
|
||||||
|
|
||||||
|
runningTot.add(currExistingAllocForUser);
|
||||||
|
runningTot.add(currNewAlloc);
|
||||||
|
runningTot.subtract(currOldAlloc);
|
||||||
|
|
||||||
|
// expire contributions from instant in time before (t - validWindow)
|
||||||
|
if (t > startTime) {
|
||||||
|
Resource pastOldAlloc =
|
||||||
|
plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
|
||||||
|
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
|
||||||
|
|
||||||
|
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
|
||||||
|
runningTot.subtract(pastOldAlloc);
|
||||||
|
runningTot.subtract(pastNewAlloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check integral
|
||||||
|
// runningTot > maxAvg * validWindow
|
||||||
|
// NOTE: we need to use comparator of IntegralResource directly, as
|
||||||
|
// Resource and ResourceCalculator assume "int" amount of resources,
|
||||||
|
// which is not sufficient when comparing integrals (out-of-bound)
|
||||||
|
if (maxAllowed.compareTo(runningTot) < 0) {
|
||||||
|
throw new PlanningQuotaException(
|
||||||
|
"Integral (avg over time) quota capacity " + maxAvg
|
||||||
|
+ " over a window of " + validWindow / 1000 + " seconds, "
|
||||||
|
+ " would be passed at time " + t + "(" + new Date(t)
|
||||||
|
+ ") by accepting reservation: "
|
||||||
|
+ reservation.getReservationId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getValidWindow() {
|
||||||
|
return validWindow;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class provides support for Resource-like book-keeping, based on
|
||||||
|
* long(s), as using Resource to store the "integral" of the allocation over
|
||||||
|
* time leads to integer overflows for large allocations/clusters. (Evolving
|
||||||
|
* Resource to use long is too disruptive at this point.)
|
||||||
|
*
|
||||||
|
* The comparison/multiplication behaviors of IntegralResource are consistent
|
||||||
|
* with the DefaultResourceCalculator.
|
||||||
|
*/
|
||||||
|
public class IntegralResource {
|
||||||
|
long memory;
|
||||||
|
long vcores;
|
||||||
|
|
||||||
|
public IntegralResource(Resource resource) {
|
||||||
|
this.memory = resource.getMemory();
|
||||||
|
this.vcores = resource.getVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
public IntegralResource(long mem, long vcores) {
|
||||||
|
this.memory = mem;
|
||||||
|
this.vcores = vcores;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(Resource r) {
|
||||||
|
memory += r.getMemory();
|
||||||
|
vcores += r.getVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void subtract(Resource r) {
|
||||||
|
memory -= r.getMemory();
|
||||||
|
vcores -= r.getVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void multiplyBy(long window) {
|
||||||
|
memory = memory * window;
|
||||||
|
vcores = vcores * window;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long compareTo(IntegralResource other) {
|
||||||
|
long diff = memory - other.memory;
|
||||||
|
if (diff == 0) {
|
||||||
|
diff = vcores - other.vcores;
|
||||||
|
}
|
||||||
|
return diff;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "<memory:" + memory + ", vCores:" + vcores + ">";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,74 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This policy enforce a simple physical cluster capacity constraints, by
|
||||||
|
* validating that the allocation proposed fits in the current plan. This
|
||||||
|
* validation is compatible with "updates" and in verifying the capacity
|
||||||
|
* constraints it conceptually remove the prior version of the reservation.
|
||||||
|
*/
|
||||||
|
@LimitedPrivate("yarn")
|
||||||
|
@Unstable
|
||||||
|
public class NoOverCommitPolicy implements SharingPolicy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void validate(Plan plan, ReservationAllocation reservation)
|
||||||
|
throws PlanningException {
|
||||||
|
|
||||||
|
ReservationAllocation oldReservation =
|
||||||
|
plan.getReservationById(reservation.getReservationId());
|
||||||
|
|
||||||
|
// check updates are using same name
|
||||||
|
if (oldReservation != null
|
||||||
|
&& !oldReservation.getUser().equals(reservation.getUser())) {
|
||||||
|
throw new MismatchedUserException(
|
||||||
|
"Updating an existing reservation with mismatching user:"
|
||||||
|
+ oldReservation.getUser() + " != " + reservation.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
long startTime = reservation.getStartTime();
|
||||||
|
long endTime = reservation.getEndTime();
|
||||||
|
long step = plan.getStep();
|
||||||
|
|
||||||
|
// for every instant in time, check we are respecting cluster capacity
|
||||||
|
for (long t = startTime; t < endTime; t += step) {
|
||||||
|
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
|
||||||
|
Resource currNewAlloc = reservation.getResourcesAtTime(t);
|
||||||
|
Resource currOldAlloc = Resource.newInstance(0, 0);
|
||||||
|
if (oldReservation != null) {
|
||||||
|
oldReservation.getResourcesAtTime(t);
|
||||||
|
}
|
||||||
|
// check the cluster is never over committed
|
||||||
|
// currExistingAllocTot + currNewAlloc - currOldAlloc >
|
||||||
|
// capPlan.getTotalCapacity()
|
||||||
|
if (Resources.greaterThan(plan.getResourceCalculator(), plan
|
||||||
|
.getTotalCapacity(), Resources.subtract(
|
||||||
|
Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
|
||||||
|
plan.getTotalCapacity())) {
|
||||||
|
throw new ResourceOverCommitException("Resources at time " + t
|
||||||
|
+ " would be overcommitted by " + "accepting reservation: "
|
||||||
|
+ reservation.getReservationId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getValidWindow() {
|
||||||
|
// this policy has no "memory" so the valid window is set to zero
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(String inventoryQueuePath, Configuration conf) {
|
||||||
|
// nothing to do for this policy
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the interface for policy that validate new
|
||||||
|
* {@link ReservationAllocation}s for allocations being added to a {@link Plan}.
|
||||||
|
* Individual policies will be enforcing different invariants.
|
||||||
|
*/
|
||||||
|
@LimitedPrivate("yarn")
|
||||||
|
@Unstable
|
||||||
|
public interface SharingPolicy {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize this policy
|
||||||
|
*
|
||||||
|
* @param inventoryQueuePath the name of the queue for this plan
|
||||||
|
* @param conf the system configuration
|
||||||
|
*/
|
||||||
|
public void init(String inventoryQueuePath, Configuration conf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method runs the policy validation logic, and return true/false on
|
||||||
|
* whether the {@link ReservationAllocation} is acceptable according to this
|
||||||
|
* sharing policy.
|
||||||
|
*
|
||||||
|
* @param plan the {@link Plan} we validate against
|
||||||
|
* @param newAllocation the allocation proposed to be added to the
|
||||||
|
* {@link Plan}
|
||||||
|
* @throws PlanningException if the policy is respected if we add this
|
||||||
|
* {@link ReservationAllocation} to the {@link Plan}
|
||||||
|
*/
|
||||||
|
public void validate(Plan plan, ReservationAllocation newAllocation)
|
||||||
|
throws PlanningException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the time range before and after the current reservation considered
|
||||||
|
* by this policy. In particular, this informs the archival process for the
|
||||||
|
* {@link Plan}, i.e., reservations regarding times before (now - validWindow)
|
||||||
|
* can be deleted.
|
||||||
|
*
|
||||||
|
* @return validWindow the window of validity considered by the policy.
|
||||||
|
*/
|
||||||
|
public long getValidWindow();
|
||||||
|
|
||||||
|
}
|
|
@ -1,5 +1,13 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown if the request made is not syntactically valid.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
public class ContractValidationException extends PlanningException {
|
public class ContractValidationException extends PlanningException {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception thrown when an update to an existing reservation is performed
|
||||||
|
* by a user that is not the reservation owner.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class MismatchedUserException extends PlanningException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 8313222590561668413L;
|
||||||
|
|
||||||
|
public MismatchedUserException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MismatchedUserException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MismatchedUserException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Exception thrown by the admission control subsystem when there is a problem
|
* Exception thrown by the admission control subsystem when there is a problem
|
||||||
* in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
|
* in trying to find an allocation for a user
|
||||||
|
* {@link ReservationSubmissionRequest}.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
public class PlanningException extends Exception {
|
public class PlanningException extends Exception {
|
||||||
|
|
||||||
private static final long serialVersionUID = -684069387367879218L;
|
private static final long serialVersionUID = -684069387367879218L;
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown if the user quota is exceed while accepting or
|
||||||
|
* updating a reservation.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class PlanningQuotaException extends PlanningException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 8206629288380246166L;
|
||||||
|
|
||||||
|
public PlanningQuotaException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PlanningQuotaException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PlanningQuotaException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception indicate that the reservation that has been attempted, would
|
||||||
|
* exceed the physical resources available in the {@link Plan} at the moment.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public class ResourceOverCommitException extends PlanningException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 7070699407526521032L;
|
||||||
|
|
||||||
|
public ResourceOverCommitException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOverCommitException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceOverCommitException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -183,6 +183,63 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String AVERAGE_CAPACITY = "average-capacity";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String IS_RESERVABLE = "reservable";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_WINDOW = "reservation-window";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String INSTANTANEOUS_MAX_CAPACITY =
|
||||||
|
"instantaneous-max-capacity";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final long DEFAULT_RESERVATION_WINDOW = 0L;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_ADMISSION_POLICY =
|
||||||
|
"reservation-policy";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_AGENT_NAME = "reservation-agent";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
|
||||||
|
"show-reservations-as-queues";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String DEFAULT_RESERVATION_AGENT_NAME =
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
|
||||||
|
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_MOVE_ON_EXPIRY =
|
||||||
|
"reservation-move-on-expiry";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String RESERVATION_ENFORCEMENT_WINDOW =
|
||||||
|
"reservation-enforcement-window";
|
||||||
|
|
||||||
|
// default to 1h lookahead enforcement
|
||||||
|
@Private
|
||||||
|
public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
|
||||||
|
|
||||||
public CapacitySchedulerConfiguration() {
|
public CapacitySchedulerConfiguration() {
|
||||||
this(new Configuration());
|
this(new Configuration());
|
||||||
}
|
}
|
||||||
|
@ -493,4 +550,101 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
|
|
||||||
return mappings;
|
return mappings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isReservable(String queue) {
|
||||||
|
boolean isReservable =
|
||||||
|
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
|
||||||
|
return isReservable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReservable(String queue, boolean isReservable) {
|
||||||
|
setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
|
||||||
|
LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
|
||||||
|
+ ", isReservableQueue=" + isReservable(queue));
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getReservationWindow(String queue) {
|
||||||
|
long reservationWindow =
|
||||||
|
getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
|
||||||
|
DEFAULT_RESERVATION_WINDOW);
|
||||||
|
return reservationWindow;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getAverageCapacity(String queue) {
|
||||||
|
float avgCapacity =
|
||||||
|
getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
|
||||||
|
MAXIMUM_CAPACITY_VALUE);
|
||||||
|
return avgCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getInstantaneousMaxCapacity(String queue) {
|
||||||
|
float instMaxCapacity =
|
||||||
|
getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
|
||||||
|
MAXIMUM_CAPACITY_VALUE);
|
||||||
|
return instMaxCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
|
||||||
|
setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
|
||||||
|
instMaxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReservationWindow(String queue, long reservationWindow) {
|
||||||
|
setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAverageCapacity(String queue, float avgCapacity) {
|
||||||
|
setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getReservationAdmissionPolicy(String queue) {
|
||||||
|
String reservationPolicy =
|
||||||
|
get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
|
||||||
|
DEFAULT_RESERVATION_ADMISSION_POLICY);
|
||||||
|
return reservationPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReservationAdmissionPolicy(String queue,
|
||||||
|
String reservationPolicy) {
|
||||||
|
set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getReservationAgent(String queue) {
|
||||||
|
String reservationAgent =
|
||||||
|
get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
|
||||||
|
DEFAULT_RESERVATION_AGENT_NAME);
|
||||||
|
return reservationAgent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReservationAgent(String queue, String reservationPolicy) {
|
||||||
|
set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getShowReservationAsQueues(String queuePath) {
|
||||||
|
boolean showReservationAsQueues =
|
||||||
|
getBoolean(getQueuePrefix(queuePath)
|
||||||
|
+ RESERVATION_SHOW_RESERVATION_AS_QUEUE, false);
|
||||||
|
return showReservationAsQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getReplanner(String queue) {
|
||||||
|
String replanner =
|
||||||
|
get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
|
||||||
|
DEFAULT_RESERVATION_PLANNER_NAME);
|
||||||
|
return replanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getMoveOnExpiry(String queue) {
|
||||||
|
boolean killOnExpiry =
|
||||||
|
getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
|
||||||
|
DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
|
||||||
|
return killOnExpiry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnforcementWindow(String queue) {
|
||||||
|
long enforcementWindow =
|
||||||
|
getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
|
||||||
|
DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
|
||||||
|
return enforcementWindow;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,222 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestCapacityOverTimePolicy {
|
||||||
|
|
||||||
|
long timeWindow;
|
||||||
|
long step;
|
||||||
|
float avgConstraint;
|
||||||
|
float instConstraint;
|
||||||
|
long initTime;
|
||||||
|
|
||||||
|
InMemoryPlan plan;
|
||||||
|
ReservationAgent mAgent;
|
||||||
|
Resource minAlloc;
|
||||||
|
ResourceCalculator res;
|
||||||
|
Resource maxAlloc;
|
||||||
|
|
||||||
|
int totCont = 1000000;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
|
||||||
|
// 24h window
|
||||||
|
timeWindow = 86400000L;
|
||||||
|
// 1 sec step
|
||||||
|
step = 1000L;
|
||||||
|
|
||||||
|
// 25% avg cap on capacity
|
||||||
|
avgConstraint = 25;
|
||||||
|
|
||||||
|
// 70% instantaneous cap on capacity
|
||||||
|
instConstraint = 70;
|
||||||
|
|
||||||
|
initTime = System.currentTimeMillis();
|
||||||
|
minAlloc = Resource.newInstance(1024, 1);
|
||||||
|
res = new DefaultResourceCalculator();
|
||||||
|
maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||||
|
|
||||||
|
mAgent = mock(ReservationAgent.class);
|
||||||
|
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
|
||||||
|
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
|
||||||
|
String reservationQ = testUtil.getFullReservationQueueName();
|
||||||
|
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
|
||||||
|
capConf.setReservationWindow(reservationQ, timeWindow);
|
||||||
|
capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint);
|
||||||
|
capConf.setAverageCapacity(reservationQ, avgConstraint);
|
||||||
|
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
|
||||||
|
policy.init(reservationQ, capConf);
|
||||||
|
|
||||||
|
plan =
|
||||||
|
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
||||||
|
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
|
||||||
|
"dedicated", null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] generateData(int length, int val) {
|
||||||
|
int[] data = new int[length];
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
data[i] = val;
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimplePass() throws IOException, PlanningException {
|
||||||
|
// generate allocation that simply fit within all constraints
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimplePass2() throws IOException, PlanningException {
|
||||||
|
// generate allocation from single tenant that exceed avg momentarily but
|
||||||
|
// fit within
|
||||||
|
// max instantanesou
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||||
|
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ResourceOverCommitException.class)
|
||||||
|
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||||
|
// generate allocation from multiple tenants that exceed tot capacity
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = PlanningQuotaException.class)
|
||||||
|
public void testInstFail() throws IOException, PlanningException {
|
||||||
|
// generate allocation that exceed the instantaneous cap single-show
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
Assert.fail("should not have accepted this");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInstFailBySum() throws IOException, PlanningException {
|
||||||
|
// generate allocation that exceed the instantaneous cap by sum
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
|
||||||
|
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
try {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
Assert.fail();
|
||||||
|
} catch (PlanningQuotaException p) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = PlanningQuotaException.class)
|
||||||
|
public void testFailAvg() throws IOException, PlanningException {
|
||||||
|
// generate an allocation which violates the 25% average single-shot
|
||||||
|
Map<ReservationInterval, ReservationRequest> req =
|
||||||
|
new TreeMap<ReservationInterval, ReservationRequest>();
|
||||||
|
long win = timeWindow / 2 + 100;
|
||||||
|
int cont = (int) Math.ceil(0.5 * totCont);
|
||||||
|
req.put(new ReservationInterval(initTime, initTime + win),
|
||||||
|
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
|
||||||
|
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailAvgBySum() throws IOException, PlanningException {
|
||||||
|
// generate an allocation which violates the 25% average by sum
|
||||||
|
Map<ReservationInterval, ReservationRequest> req =
|
||||||
|
new TreeMap<ReservationInterval, ReservationRequest>();
|
||||||
|
long win = 86400000 / 4 + 1;
|
||||||
|
int cont = (int) Math.ceil(0.5 * totCont);
|
||||||
|
req.put(new ReservationInterval(initTime, initTime + win),
|
||||||
|
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + win, req, res, minAlloc)));
|
||||||
|
|
||||||
|
Assert.fail("should not have accepted this");
|
||||||
|
} catch (PlanningQuotaException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,144 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestNoOverCommitPolicy {
|
||||||
|
|
||||||
|
long step;
|
||||||
|
long initTime;
|
||||||
|
|
||||||
|
InMemoryPlan plan;
|
||||||
|
ReservationAgent mAgent;
|
||||||
|
Resource minAlloc;
|
||||||
|
ResourceCalculator res;
|
||||||
|
Resource maxAlloc;
|
||||||
|
|
||||||
|
int totCont = 1000000;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
|
||||||
|
// 1 sec step
|
||||||
|
step = 1000L;
|
||||||
|
|
||||||
|
initTime = System.currentTimeMillis();
|
||||||
|
minAlloc = Resource.newInstance(1024, 1);
|
||||||
|
res = new DefaultResourceCalculator();
|
||||||
|
maxAlloc = Resource.newInstance(1024 * 8, 8);
|
||||||
|
|
||||||
|
mAgent = mock(ReservationAgent.class);
|
||||||
|
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
|
||||||
|
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
|
||||||
|
String reservationQ = testUtil.getFullReservationQueueName();
|
||||||
|
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
|
||||||
|
NoOverCommitPolicy policy = new NoOverCommitPolicy();
|
||||||
|
policy.init(reservationQ, capConf);
|
||||||
|
|
||||||
|
plan =
|
||||||
|
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
|
||||||
|
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
|
||||||
|
"dedicated", null, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int[] generateData(int length, int val) {
|
||||||
|
int[] data = new int[length];
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
data[i] = val;
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
|
||||||
|
// generate allocation that easily fit within resource constraints
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleUserBarelyFitPass() throws IOException,
|
||||||
|
PlanningException {
|
||||||
|
// generate allocation from single tenant that barely fit
|
||||||
|
int[] f = generateData(3600, totCont);
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ResourceOverCommitException.class)
|
||||||
|
public void testSingleFail() throws IOException, PlanningException {
|
||||||
|
// generate allocation from single tenant that exceed capacity
|
||||||
|
int[] f = generateData(3600, (int) (1.1 * totCont));
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = MismatchedUserException.class)
|
||||||
|
public void testUserMismatch() throws IOException, PlanningException {
|
||||||
|
// generate allocation from single tenant that exceed capacity
|
||||||
|
int[] f = generateData(3600, (int) (0.5 * totCont));
|
||||||
|
|
||||||
|
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
|
||||||
|
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||||
|
|
||||||
|
// trying to update a reservation with a mismatching user
|
||||||
|
plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
|
||||||
|
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
|
||||||
|
.generateAllocation(initTime, step, f), res, minAlloc));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiTenantPass() throws IOException, PlanningException {
|
||||||
|
// generate allocation from multiple tenants that barely fit in tot capacity
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ResourceOverCommitException.class)
|
||||||
|
public void testMultiTenantFail() throws IOException, PlanningException {
|
||||||
|
// generate allocation from multiple tenants that exceed tot capacity
|
||||||
|
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
assertTrue(plan.toString(),
|
||||||
|
plan.addReservation(new InMemoryReservationAllocation(
|
||||||
|
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
|
||||||
|
"dedicated", initTime, initTime + f.length,
|
||||||
|
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
|
||||||
|
res, minAlloc)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue