YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan.

(cherry picked from commit c4918cb4cb)
This commit is contained in:
carlo curino 2014-09-16 13:20:57 -07:00 committed by Chris Douglas
parent f66ffcf832
commit b6df0dddcd
12 changed files with 977 additions and 2 deletions

View File

@ -14,3 +14,6 @@ subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
curino)
YARN-1711. Policy to enforce instantaneous and over-time quotas
on user reservations. (Carlo Curino and Subru Krishnan via curino)

View File

@ -0,0 +1,231 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* This policy enforces a time-extended notion of Capacity. In particular it
* guarantees that the allocation received in input when combined with all
* previous allocation for the user does not violate an instantaneous max limit
* on the resources received, and that for every window of time of length
* validWindow, the integral of the allocations for a user (sum of the currently
* submitted allocation and all prior allocations for the user) does not exceed
* validWindow * maxAvg.
*
* This allows flexibility, in the sense that an allocation can instantaneously
* use large portions of the available capacity, but prevents abuses by bounding
* the average use over time.
*
* By controlling maxInst, maxAvg, validWindow the administrator configuring
* this policy can obtain a behavior ranging from instantaneously enforced
* capacity (akin to existing queues), or fully flexible allocations (likely
* reserved to super-users, or trusted systems).
*/
@LimitedPrivate("yarn")
@Unstable
public class CapacityOverTimePolicy implements SharingPolicy {
private CapacitySchedulerConfiguration conf;
private long validWindow;
private float maxInst;
private float maxAvg;
// For now this is CapacityScheduler specific, but given a hierarchy in the
// configuration structure of the schedulers (e.g., SchedulerConfiguration)
// it should be easy to remove this limitation
@Override
public void init(String reservationQueuePath, Configuration conf) {
this.conf = (CapacitySchedulerConfiguration) conf;
validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
};
@Override
public void validate(Plan plan, ReservationAllocation reservation)
throws PlanningException {
// this is entire method invoked under a write-lock on the plan, no need
// to synchronize accesses to the plan further
// Try to verify whether there is already a reservation with this ID in
// the system (remove its contribution during validation to simulate a
// try-n-swap
// update).
ReservationAllocation oldReservation =
plan.getReservationById(reservation.getReservationId());
// sanity check that the update of a reservation is not changing username
if (oldReservation != null
&& !oldReservation.getUser().equals(reservation.getUser())) {
throw new MismatchedUserException(
"Updating an existing reservation with mismatched user:"
+ oldReservation.getUser() + " != " + reservation.getUser());
}
long startTime = reservation.getStartTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();
Resource planTotalCapacity = plan.getTotalCapacity();
Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
// define variable that will store integral of resources (need diff class to
// avoid overflow issues for long/large allocations)
IntegralResource runningTot = new IntegralResource(0L, 0L);
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
maxAllowed.multiplyBy(validWindow / step);
// check that the resources offered to the user during any window of length
// "validWindow" overlapping this allocation are within maxAllowed
// also enforce instantaneous and physical constraints during this pass
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
Resource currExistingAllocForUser =
plan.getConsumptionForUser(reservation.getUser(), t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resources.none();
if (oldReservation != null) {
currOldAlloc = oldReservation.getResourcesAtTime(t);
}
// throw exception if the cluster is overcommitted
// tot_allocated - old + new > capacity
Resource inst =
Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
currOldAlloc);
if (Resources.greaterThan(plan.getResourceCalculator(),
planTotalCapacity, inst, planTotalCapacity)) {
throw new ResourceOverCommitException(" Resources at time " + t
+ " would be overcommitted (" + inst + " over "
+ plan.getTotalCapacity() + ") by accepting reservation: "
+ reservation.getReservationId());
}
// throw exception if instantaneous limits are violated
// tot_alloc_to_this_user - old + new > inst_limit
if (Resources.greaterThan(plan.getResourceCalculator(),
planTotalCapacity, Resources.subtract(
Resources.add(currExistingAllocForUser, currNewAlloc),
currOldAlloc), maxInsRes)) {
throw new PlanningQuotaException("Instantaneous quota capacity "
+ maxInst + " would be passed at time " + t
+ " by accepting reservation: " + reservation.getReservationId());
}
// throw exception if the running integral of utilization over validWindow
// is violated. We perform a delta check, adding/removing instants at the
// boundary of the window from runningTot.
// runningTot = previous_runningTot + currExistingAllocForUser +
// currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
// Where:
// 1) currNewAlloc, currExistingAllocForUser represent the contribution of
// the instant in time added in this pass.
// 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
// instants that are being retired from the the window
// 3) currOldAlloc is the contribution (if any) of the previous version of
// this reservation (the one we are updating)
runningTot.add(currExistingAllocForUser);
runningTot.add(currNewAlloc);
runningTot.subtract(currOldAlloc);
// expire contributions from instant in time before (t - validWindow)
if (t > startTime) {
Resource pastOldAlloc =
plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
runningTot.subtract(pastOldAlloc);
runningTot.subtract(pastNewAlloc);
}
// check integral
// runningTot > maxAvg * validWindow
// NOTE: we need to use comparator of IntegralResource directly, as
// Resource and ResourceCalculator assume "int" amount of resources,
// which is not sufficient when comparing integrals (out-of-bound)
if (maxAllowed.compareTo(runningTot) < 0) {
throw new PlanningQuotaException(
"Integral (avg over time) quota capacity " + maxAvg
+ " over a window of " + validWindow / 1000 + " seconds, "
+ " would be passed at time " + t + "(" + new Date(t)
+ ") by accepting reservation: "
+ reservation.getReservationId());
}
}
}
@Override
public long getValidWindow() {
return validWindow;
}
/**
* This class provides support for Resource-like book-keeping, based on
* long(s), as using Resource to store the "integral" of the allocation over
* time leads to integer overflows for large allocations/clusters. (Evolving
* Resource to use long is too disruptive at this point.)
*
* The comparison/multiplication behaviors of IntegralResource are consistent
* with the DefaultResourceCalculator.
*/
public class IntegralResource {
long memory;
long vcores;
public IntegralResource(Resource resource) {
this.memory = resource.getMemory();
this.vcores = resource.getVirtualCores();
}
public IntegralResource(long mem, long vcores) {
this.memory = mem;
this.vcores = vcores;
}
public void add(Resource r) {
memory += r.getMemory();
vcores += r.getVirtualCores();
}
public void subtract(Resource r) {
memory -= r.getMemory();
vcores -= r.getVirtualCores();
}
public void multiplyBy(long window) {
memory = memory * window;
vcores = vcores * window;
}
public long compareTo(IntegralResource other) {
long diff = memory - other.memory;
if (diff == 0) {
diff = vcores - other.vcores;
}
return diff;
}
@Override
public String toString() {
return "<memory:" + memory + ", vCores:" + vcores + ">";
}
}
}

View File

@ -0,0 +1,74 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* This policy enforce a simple physical cluster capacity constraints, by
* validating that the allocation proposed fits in the current plan. This
* validation is compatible with "updates" and in verifying the capacity
* constraints it conceptually remove the prior version of the reservation.
*/
@LimitedPrivate("yarn")
@Unstable
public class NoOverCommitPolicy implements SharingPolicy {
@Override
public void validate(Plan plan, ReservationAllocation reservation)
throws PlanningException {
ReservationAllocation oldReservation =
plan.getReservationById(reservation.getReservationId());
// check updates are using same name
if (oldReservation != null
&& !oldReservation.getUser().equals(reservation.getUser())) {
throw new MismatchedUserException(
"Updating an existing reservation with mismatching user:"
+ oldReservation.getUser() + " != " + reservation.getUser());
}
long startTime = reservation.getStartTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();
// for every instant in time, check we are respecting cluster capacity
for (long t = startTime; t < endTime; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resource.newInstance(0, 0);
if (oldReservation != null) {
oldReservation.getResourcesAtTime(t);
}
// check the cluster is never over committed
// currExistingAllocTot + currNewAlloc - currOldAlloc >
// capPlan.getTotalCapacity()
if (Resources.greaterThan(plan.getResourceCalculator(), plan
.getTotalCapacity(), Resources.subtract(
Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
plan.getTotalCapacity())) {
throw new ResourceOverCommitException("Resources at time " + t
+ " would be overcommitted by " + "accepting reservation: "
+ reservation.getReservationId());
}
}
}
@Override
public long getValidWindow() {
// this policy has no "memory" so the valid window is set to zero
return 0;
}
@Override
public void init(String inventoryQueuePath, Configuration conf) {
// nothing to do for this policy
}
}

View File

@ -0,0 +1,49 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* This is the interface for policy that validate new
* {@link ReservationAllocation}s for allocations being added to a {@link Plan}.
* Individual policies will be enforcing different invariants.
*/
@LimitedPrivate("yarn")
@Unstable
public interface SharingPolicy {
/**
* Initialize this policy
*
* @param inventoryQueuePath the name of the queue for this plan
* @param conf the system configuration
*/
public void init(String inventoryQueuePath, Configuration conf);
/**
* This method runs the policy validation logic, and return true/false on
* whether the {@link ReservationAllocation} is acceptable according to this
* sharing policy.
*
* @param plan the {@link Plan} we validate against
* @param newAllocation the allocation proposed to be added to the
* {@link Plan}
* @throws PlanningException if the policy is respected if we add this
* {@link ReservationAllocation} to the {@link Plan}
*/
public void validate(Plan plan, ReservationAllocation newAllocation)
throws PlanningException;
/**
* Returns the time range before and after the current reservation considered
* by this policy. In particular, this informs the archival process for the
* {@link Plan}, i.e., reservations regarding times before (now - validWindow)
* can be deleted.
*
* @return validWindow the window of validity considered by the policy.
*/
public long getValidWindow();
}

View File

@ -1,5 +1,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception is thrown if the request made is not syntactically valid.
*/
@Public
@Unstable
public class ContractValidationException extends PlanningException {
private static final long serialVersionUID = 1L;
@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException {
super(message);
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Exception thrown when an update to an existing reservation is performed
* by a user that is not the reservation owner.
*/
@Public
@Unstable
public class MismatchedUserException extends PlanningException {
private static final long serialVersionUID = 8313222590561668413L;
public MismatchedUserException(String message) {
super(message);
}
public MismatchedUserException(Throwable cause) {
super(cause);
}
public MismatchedUserException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Exception thrown by the admission control subsystem when there is a problem
* in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
* in trying to find an allocation for a user
* {@link ReservationSubmissionRequest}.
*/
@Public
@Unstable
public class PlanningException extends Exception {
private static final long serialVersionUID = -684069387367879218L;

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception is thrown if the user quota is exceed while accepting or
* updating a reservation.
*/
@Public
@Unstable
public class PlanningQuotaException extends PlanningException {
private static final long serialVersionUID = 8206629288380246166L;
public PlanningQuotaException(String message) {
super(message);
}
public PlanningQuotaException(Throwable cause) {
super(cause);
}
public PlanningQuotaException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception indicate that the reservation that has been attempted, would
* exceed the physical resources available in the {@link Plan} at the moment.
*/
@Public
@Unstable
public class ResourceOverCommitException extends PlanningException {
private static final long serialVersionUID = 7070699407526521032L;
public ResourceOverCommitException(String message) {
super(message);
}
public ResourceOverCommitException(Throwable cause) {
super(cause);
}
public ResourceOverCommitException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -190,6 +190,63 @@ public class CapacitySchedulerConfiguration extends Configuration {
}
}
@Private
public static final String AVERAGE_CAPACITY = "average-capacity";
@Private
public static final String IS_RESERVABLE = "reservable";
@Private
public static final String RESERVATION_WINDOW = "reservation-window";
@Private
public static final String INSTANTANEOUS_MAX_CAPACITY =
"instantaneous-max-capacity";
@Private
public static final long DEFAULT_RESERVATION_WINDOW = 0L;
@Private
public static final String RESERVATION_ADMISSION_POLICY =
"reservation-policy";
@Private
public static final String RESERVATION_AGENT_NAME = "reservation-agent";
@Private
public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
"show-reservations-as-queues";
@Private
public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
@Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
@Private
public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
@Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
@Private
public static final String RESERVATION_MOVE_ON_EXPIRY =
"reservation-move-on-expiry";
@Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
@Private
public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window";
// default to 1h lookahead enforcement
@Private
public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@ -511,4 +568,101 @@ public class CapacitySchedulerConfiguration extends Configuration {
return mappings;
}
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
return isReservable;
}
public void setReservable(String queue, boolean isReservable) {
setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
+ ", isReservableQueue=" + isReservable(queue));
}
public long getReservationWindow(String queue) {
long reservationWindow =
getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
DEFAULT_RESERVATION_WINDOW);
return reservationWindow;
}
public float getAverageCapacity(String queue) {
float avgCapacity =
getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return avgCapacity;
}
public float getInstantaneousMaxCapacity(String queue) {
float instMaxCapacity =
getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return instMaxCapacity;
}
public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
instMaxCapacity);
}
public void setReservationWindow(String queue, long reservationWindow) {
setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
}
public void setAverageCapacity(String queue, float avgCapacity) {
setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
}
public String getReservationAdmissionPolicy(String queue) {
String reservationPolicy =
get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
DEFAULT_RESERVATION_ADMISSION_POLICY);
return reservationPolicy;
}
public void setReservationAdmissionPolicy(String queue,
String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
}
public String getReservationAgent(String queue) {
String reservationAgent =
get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
DEFAULT_RESERVATION_AGENT_NAME);
return reservationAgent;
}
public void setReservationAgent(String queue, String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
}
public boolean getShowReservationAsQueues(String queuePath) {
boolean showReservationAsQueues =
getBoolean(getQueuePrefix(queuePath)
+ RESERVATION_SHOW_RESERVATION_AS_QUEUE, false);
return showReservationAsQueues;
}
public String getReplanner(String queue) {
String replanner =
get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
DEFAULT_RESERVATION_PLANNER_NAME);
return replanner;
}
public boolean getMoveOnExpiry(String queue) {
boolean killOnExpiry =
getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
return killOnExpiry;
}
public long getEnforcementWindow(String queue) {
long enforcementWindow =
getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
return enforcementWindow;
}
}

View File

@ -0,0 +1,222 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestCapacityOverTimePolicy {
long timeWindow;
long step;
float avgConstraint;
float instConstraint;
long initTime;
InMemoryPlan plan;
ReservationAgent mAgent;
Resource minAlloc;
ResourceCalculator res;
Resource maxAlloc;
int totCont = 1000000;
@Before
public void setup() throws Exception {
// 24h window
timeWindow = 86400000L;
// 1 sec step
step = 1000L;
// 25% avg cap on capacity
avgConstraint = 25;
// 70% instantaneous cap on capacity
instConstraint = 70;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint);
capConf.setAverageCapacity(reservationQ, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf);
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
"dedicated", null, true);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
}
@Test
public void testSimplePass() throws IOException, PlanningException {
// generate allocation that simply fit within all constraints
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testSimplePass2() throws IOException, PlanningException {
// generate allocation from single tenant that exceed avg momentarily but
// fit within
// max instantanesou
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = ResourceOverCommitException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = PlanningQuotaException.class)
public void testInstFail() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap single-show
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
Assert.fail("should not have accepted this");
}
@Test
public void testInstFailBySum() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap by sum
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
Assert.fail();
} catch (PlanningQuotaException p) {
// expected
}
}
@Test(expected = PlanningQuotaException.class)
public void testFailAvg() throws IOException, PlanningException {
// generate an allocation which violates the 25% average single-shot
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
long win = timeWindow / 2 + 100;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
}
@Test
public void testFailAvgBySum() throws IOException, PlanningException {
// generate an allocation which violates the 25% average by sum
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
long win = 86400000 / 4 + 1;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
Assert.fail("should not have accepted this");
} catch (PlanningQuotaException e) {
// expected
}
}
}

View File

@ -0,0 +1,144 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import org.junit.Test;
public class TestNoOverCommitPolicy {
long step;
long initTime;
InMemoryPlan plan;
ReservationAgent mAgent;
Resource minAlloc;
ResourceCalculator res;
Resource maxAlloc;
int totCont = 1000000;
@Before
public void setup() throws Exception {
// 1 sec step
step = 1000L;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
NoOverCommitPolicy policy = new NoOverCommitPolicy();
policy.init(reservationQ, capConf);
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
"dedicated", null, true);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
}
@Test
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
// generate allocation that easily fit within resource constraints
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testSingleUserBarelyFitPass() throws IOException,
PlanningException {
// generate allocation from single tenant that barely fit
int[] f = generateData(3600, totCont);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test(expected = ResourceOverCommitException.class)
public void testSingleFail() throws IOException, PlanningException {
// generate allocation from single tenant that exceed capacity
int[] f = generateData(3600, (int) (1.1 * totCont));
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
}
@Test(expected = MismatchedUserException.class)
public void testUserMismatch() throws IOException, PlanningException {
// generate allocation from single tenant that exceed capacity
int[] f = generateData(3600, (int) (0.5 * totCont));
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
// trying to update a reservation with a mismatching user
plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = ResourceOverCommitException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
}