YARN-1711. Policy to enforce instantaneous and over-time quotas on user reservation. Contributed by Carlo Curino and Subru Krishnan.

(cherry picked from commit c4918cb4cb)
(cherry picked from commit b6df0dddcd)
(cherry picked from commit 6bfdaf06c4)
This commit is contained in:
carlo curino 2014-09-16 13:20:57 -07:00 committed by Chris Douglas
parent 4ee027b9d6
commit 56972123ff
12 changed files with 977 additions and 2 deletions

View File

@ -14,3 +14,6 @@ subru)
YARN-1710. Logic to find allocations within a Plan that satisfy
user ReservationRequest(s). (Carlo Curino and Subru Krishnan via
curino)
YARN-1711. Policy to enforce instantaneous and over-time quotas
on user reservations. (Carlo Curino and Subru Krishnan via curino)

View File

@ -0,0 +1,231 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* This policy enforces a time-extended notion of Capacity. In particular it
* guarantees that the allocation received in input when combined with all
* previous allocation for the user does not violate an instantaneous max limit
* on the resources received, and that for every window of time of length
* validWindow, the integral of the allocations for a user (sum of the currently
* submitted allocation and all prior allocations for the user) does not exceed
* validWindow * maxAvg.
*
* This allows flexibility, in the sense that an allocation can instantaneously
* use large portions of the available capacity, but prevents abuses by bounding
* the average use over time.
*
* By controlling maxInst, maxAvg, validWindow the administrator configuring
* this policy can obtain a behavior ranging from instantaneously enforced
* capacity (akin to existing queues), or fully flexible allocations (likely
* reserved to super-users, or trusted systems).
*/
@LimitedPrivate("yarn")
@Unstable
public class CapacityOverTimePolicy implements SharingPolicy {
private CapacitySchedulerConfiguration conf;
private long validWindow;
private float maxInst;
private float maxAvg;
// For now this is CapacityScheduler specific, but given a hierarchy in the
// configuration structure of the schedulers (e.g., SchedulerConfiguration)
// it should be easy to remove this limitation
@Override
public void init(String reservationQueuePath, Configuration conf) {
this.conf = (CapacitySchedulerConfiguration) conf;
validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
};
@Override
public void validate(Plan plan, ReservationAllocation reservation)
throws PlanningException {
// this is entire method invoked under a write-lock on the plan, no need
// to synchronize accesses to the plan further
// Try to verify whether there is already a reservation with this ID in
// the system (remove its contribution during validation to simulate a
// try-n-swap
// update).
ReservationAllocation oldReservation =
plan.getReservationById(reservation.getReservationId());
// sanity check that the update of a reservation is not changing username
if (oldReservation != null
&& !oldReservation.getUser().equals(reservation.getUser())) {
throw new MismatchedUserException(
"Updating an existing reservation with mismatched user:"
+ oldReservation.getUser() + " != " + reservation.getUser());
}
long startTime = reservation.getStartTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();
Resource planTotalCapacity = plan.getTotalCapacity();
Resource maxAvgRes = Resources.multiply(planTotalCapacity, maxAvg);
Resource maxInsRes = Resources.multiply(planTotalCapacity, maxInst);
// define variable that will store integral of resources (need diff class to
// avoid overflow issues for long/large allocations)
IntegralResource runningTot = new IntegralResource(0L, 0L);
IntegralResource maxAllowed = new IntegralResource(maxAvgRes);
maxAllowed.multiplyBy(validWindow / step);
// check that the resources offered to the user during any window of length
// "validWindow" overlapping this allocation are within maxAllowed
// also enforce instantaneous and physical constraints during this pass
for (long t = startTime - validWindow; t < endTime + validWindow; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
Resource currExistingAllocForUser =
plan.getConsumptionForUser(reservation.getUser(), t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resources.none();
if (oldReservation != null) {
currOldAlloc = oldReservation.getResourcesAtTime(t);
}
// throw exception if the cluster is overcommitted
// tot_allocated - old + new > capacity
Resource inst =
Resources.subtract(Resources.add(currExistingAllocTot, currNewAlloc),
currOldAlloc);
if (Resources.greaterThan(plan.getResourceCalculator(),
planTotalCapacity, inst, planTotalCapacity)) {
throw new ResourceOverCommitException(" Resources at time " + t
+ " would be overcommitted (" + inst + " over "
+ plan.getTotalCapacity() + ") by accepting reservation: "
+ reservation.getReservationId());
}
// throw exception if instantaneous limits are violated
// tot_alloc_to_this_user - old + new > inst_limit
if (Resources.greaterThan(plan.getResourceCalculator(),
planTotalCapacity, Resources.subtract(
Resources.add(currExistingAllocForUser, currNewAlloc),
currOldAlloc), maxInsRes)) {
throw new PlanningQuotaException("Instantaneous quota capacity "
+ maxInst + " would be passed at time " + t
+ " by accepting reservation: " + reservation.getReservationId());
}
// throw exception if the running integral of utilization over validWindow
// is violated. We perform a delta check, adding/removing instants at the
// boundary of the window from runningTot.
// runningTot = previous_runningTot + currExistingAllocForUser +
// currNewAlloc - currOldAlloc - pastNewAlloc - pastOldAlloc;
// Where:
// 1) currNewAlloc, currExistingAllocForUser represent the contribution of
// the instant in time added in this pass.
// 2) pastNewAlloc, pastOldAlloc are the contributions relative to time
// instants that are being retired from the the window
// 3) currOldAlloc is the contribution (if any) of the previous version of
// this reservation (the one we are updating)
runningTot.add(currExistingAllocForUser);
runningTot.add(currNewAlloc);
runningTot.subtract(currOldAlloc);
// expire contributions from instant in time before (t - validWindow)
if (t > startTime) {
Resource pastOldAlloc =
plan.getConsumptionForUser(reservation.getUser(), t - validWindow);
Resource pastNewAlloc = reservation.getResourcesAtTime(t - validWindow);
// runningTot = runningTot - pastExistingAlloc - pastNewAlloc;
runningTot.subtract(pastOldAlloc);
runningTot.subtract(pastNewAlloc);
}
// check integral
// runningTot > maxAvg * validWindow
// NOTE: we need to use comparator of IntegralResource directly, as
// Resource and ResourceCalculator assume "int" amount of resources,
// which is not sufficient when comparing integrals (out-of-bound)
if (maxAllowed.compareTo(runningTot) < 0) {
throw new PlanningQuotaException(
"Integral (avg over time) quota capacity " + maxAvg
+ " over a window of " + validWindow / 1000 + " seconds, "
+ " would be passed at time " + t + "(" + new Date(t)
+ ") by accepting reservation: "
+ reservation.getReservationId());
}
}
}
@Override
public long getValidWindow() {
return validWindow;
}
/**
* This class provides support for Resource-like book-keeping, based on
* long(s), as using Resource to store the "integral" of the allocation over
* time leads to integer overflows for large allocations/clusters. (Evolving
* Resource to use long is too disruptive at this point.)
*
* The comparison/multiplication behaviors of IntegralResource are consistent
* with the DefaultResourceCalculator.
*/
public class IntegralResource {
long memory;
long vcores;
public IntegralResource(Resource resource) {
this.memory = resource.getMemory();
this.vcores = resource.getVirtualCores();
}
public IntegralResource(long mem, long vcores) {
this.memory = mem;
this.vcores = vcores;
}
public void add(Resource r) {
memory += r.getMemory();
vcores += r.getVirtualCores();
}
public void subtract(Resource r) {
memory -= r.getMemory();
vcores -= r.getVirtualCores();
}
public void multiplyBy(long window) {
memory = memory * window;
vcores = vcores * window;
}
public long compareTo(IntegralResource other) {
long diff = memory - other.memory;
if (diff == 0) {
diff = vcores - other.vcores;
}
return diff;
}
@Override
public String toString() {
return "<memory:" + memory + ", vCores:" + vcores + ">";
}
}
}

View File

@ -0,0 +1,74 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* This policy enforce a simple physical cluster capacity constraints, by
* validating that the allocation proposed fits in the current plan. This
* validation is compatible with "updates" and in verifying the capacity
* constraints it conceptually remove the prior version of the reservation.
*/
@LimitedPrivate("yarn")
@Unstable
public class NoOverCommitPolicy implements SharingPolicy {
@Override
public void validate(Plan plan, ReservationAllocation reservation)
throws PlanningException {
ReservationAllocation oldReservation =
plan.getReservationById(reservation.getReservationId());
// check updates are using same name
if (oldReservation != null
&& !oldReservation.getUser().equals(reservation.getUser())) {
throw new MismatchedUserException(
"Updating an existing reservation with mismatching user:"
+ oldReservation.getUser() + " != " + reservation.getUser());
}
long startTime = reservation.getStartTime();
long endTime = reservation.getEndTime();
long step = plan.getStep();
// for every instant in time, check we are respecting cluster capacity
for (long t = startTime; t < endTime; t += step) {
Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
Resource currNewAlloc = reservation.getResourcesAtTime(t);
Resource currOldAlloc = Resource.newInstance(0, 0);
if (oldReservation != null) {
oldReservation.getResourcesAtTime(t);
}
// check the cluster is never over committed
// currExistingAllocTot + currNewAlloc - currOldAlloc >
// capPlan.getTotalCapacity()
if (Resources.greaterThan(plan.getResourceCalculator(), plan
.getTotalCapacity(), Resources.subtract(
Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
plan.getTotalCapacity())) {
throw new ResourceOverCommitException("Resources at time " + t
+ " would be overcommitted by " + "accepting reservation: "
+ reservation.getReservationId());
}
}
}
@Override
public long getValidWindow() {
// this policy has no "memory" so the valid window is set to zero
return 0;
}
@Override
public void init(String inventoryQueuePath, Configuration conf) {
// nothing to do for this policy
}
}

View File

@ -0,0 +1,49 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/**
* This is the interface for policy that validate new
* {@link ReservationAllocation}s for allocations being added to a {@link Plan}.
* Individual policies will be enforcing different invariants.
*/
@LimitedPrivate("yarn")
@Unstable
public interface SharingPolicy {
/**
* Initialize this policy
*
* @param inventoryQueuePath the name of the queue for this plan
* @param conf the system configuration
*/
public void init(String inventoryQueuePath, Configuration conf);
/**
* This method runs the policy validation logic, and return true/false on
* whether the {@link ReservationAllocation} is acceptable according to this
* sharing policy.
*
* @param plan the {@link Plan} we validate against
* @param newAllocation the allocation proposed to be added to the
* {@link Plan}
* @throws PlanningException if the policy is respected if we add this
* {@link ReservationAllocation} to the {@link Plan}
*/
public void validate(Plan plan, ReservationAllocation newAllocation)
throws PlanningException;
/**
* Returns the time range before and after the current reservation considered
* by this policy. In particular, this informs the archival process for the
* {@link Plan}, i.e., reservations regarding times before (now - validWindow)
* can be deleted.
*
* @return validWindow the window of validity considered by the policy.
*/
public long getValidWindow();
}

View File

@ -1,5 +1,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception is thrown if the request made is not syntactically valid.
*/
@Public
@Unstable
public class ContractValidationException extends PlanningException {
private static final long serialVersionUID = 1L;
@ -8,5 +16,4 @@ public class ContractValidationException extends PlanningException {
super(message);
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Exception thrown when an update to an existing reservation is performed
* by a user that is not the reservation owner.
*/
@Public
@Unstable
public class MismatchedUserException extends PlanningException {
private static final long serialVersionUID = 8313222590561668413L;
public MismatchedUserException(String message) {
super(message);
}
public MismatchedUserException(Throwable cause) {
super(cause);
}
public MismatchedUserException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -2,10 +2,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Exception thrown by the admission control subsystem when there is a problem
* in trying to find an allocation for a user {@link ReservationSubmissionRequest}.
* in trying to find an allocation for a user
* {@link ReservationSubmissionRequest}.
*/
@Public
@Unstable
public class PlanningException extends Exception {
private static final long serialVersionUID = -684069387367879218L;

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception is thrown if the user quota is exceed while accepting or
* updating a reservation.
*/
@Public
@Unstable
public class PlanningQuotaException extends PlanningException {
private static final long serialVersionUID = 8206629288380246166L;
public PlanningQuotaException(String message) {
super(message);
}
public PlanningQuotaException(Throwable cause) {
super(cause);
}
public PlanningQuotaException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* This exception indicate that the reservation that has been attempted, would
* exceed the physical resources available in the {@link Plan} at the moment.
*/
@Public
@Unstable
public class ResourceOverCommitException extends PlanningException {
private static final long serialVersionUID = 7070699407526521032L;
public ResourceOverCommitException(String message) {
super(message);
}
public ResourceOverCommitException(Throwable cause) {
super(cause);
}
public ResourceOverCommitException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -190,6 +190,63 @@ public class CapacitySchedulerConfiguration extends Configuration {
}
}
@Private
public static final String AVERAGE_CAPACITY = "average-capacity";
@Private
public static final String IS_RESERVABLE = "reservable";
@Private
public static final String RESERVATION_WINDOW = "reservation-window";
@Private
public static final String INSTANTANEOUS_MAX_CAPACITY =
"instantaneous-max-capacity";
@Private
public static final long DEFAULT_RESERVATION_WINDOW = 0L;
@Private
public static final String RESERVATION_ADMISSION_POLICY =
"reservation-policy";
@Private
public static final String RESERVATION_AGENT_NAME = "reservation-agent";
@Private
public static final String RESERVATION_SHOW_RESERVATION_AS_QUEUE =
"show-reservations-as-queues";
@Private
public static final String DEFAULT_RESERVATION_ADMISSION_POLICY =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy";
@Private
public static final String DEFAULT_RESERVATION_AGENT_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent";
@Private
public static final String RESERVATION_PLANNER_NAME = "reservation-planner";
@Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner";
@Private
public static final String RESERVATION_MOVE_ON_EXPIRY =
"reservation-move-on-expiry";
@Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;
@Private
public static final String RESERVATION_ENFORCEMENT_WINDOW =
"reservation-enforcement-window";
// default to 1h lookahead enforcement
@Private
public static final long DEFAULT_RESERVATION_ENFORCEMENT_WINDOW = 3600000;
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@ -511,4 +568,101 @@ public class CapacitySchedulerConfiguration extends Configuration {
return mappings;
}
public boolean isReservable(String queue) {
boolean isReservable =
getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
return isReservable;
}
public void setReservable(String queue, boolean isReservable) {
setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable);
LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue)
+ ", isReservableQueue=" + isReservable(queue));
}
public long getReservationWindow(String queue) {
long reservationWindow =
getLong(getQueuePrefix(queue) + RESERVATION_WINDOW,
DEFAULT_RESERVATION_WINDOW);
return reservationWindow;
}
public float getAverageCapacity(String queue) {
float avgCapacity =
getFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return avgCapacity;
}
public float getInstantaneousMaxCapacity(String queue) {
float instMaxCapacity =
getFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
return instMaxCapacity;
}
public void setInstantaneousMaxCapacity(String queue, float instMaxCapacity) {
setFloat(getQueuePrefix(queue) + INSTANTANEOUS_MAX_CAPACITY,
instMaxCapacity);
}
public void setReservationWindow(String queue, long reservationWindow) {
setLong(getQueuePrefix(queue) + RESERVATION_WINDOW, reservationWindow);
}
public void setAverageCapacity(String queue, float avgCapacity) {
setFloat(getQueuePrefix(queue) + AVERAGE_CAPACITY, avgCapacity);
}
public String getReservationAdmissionPolicy(String queue) {
String reservationPolicy =
get(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY,
DEFAULT_RESERVATION_ADMISSION_POLICY);
return reservationPolicy;
}
public void setReservationAdmissionPolicy(String queue,
String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_ADMISSION_POLICY, reservationPolicy);
}
public String getReservationAgent(String queue) {
String reservationAgent =
get(getQueuePrefix(queue) + RESERVATION_AGENT_NAME,
DEFAULT_RESERVATION_AGENT_NAME);
return reservationAgent;
}
public void setReservationAgent(String queue, String reservationPolicy) {
set(getQueuePrefix(queue) + RESERVATION_AGENT_NAME, reservationPolicy);
}
public boolean getShowReservationAsQueues(String queuePath) {
boolean showReservationAsQueues =
getBoolean(getQueuePrefix(queuePath)
+ RESERVATION_SHOW_RESERVATION_AS_QUEUE, false);
return showReservationAsQueues;
}
public String getReplanner(String queue) {
String replanner =
get(getQueuePrefix(queue) + RESERVATION_PLANNER_NAME,
DEFAULT_RESERVATION_PLANNER_NAME);
return replanner;
}
public boolean getMoveOnExpiry(String queue) {
boolean killOnExpiry =
getBoolean(getQueuePrefix(queue) + RESERVATION_MOVE_ON_EXPIRY,
DEFAULT_RESERVATION_MOVE_ON_EXPIRY);
return killOnExpiry;
}
public long getEnforcementWindow(String queue) {
long enforcementWindow =
getLong(getQueuePrefix(queue) + RESERVATION_ENFORCEMENT_WINDOW,
DEFAULT_RESERVATION_ENFORCEMENT_WINDOW);
return enforcementWindow;
}
}

View File

@ -0,0 +1,222 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestCapacityOverTimePolicy {
long timeWindow;
long step;
float avgConstraint;
float instConstraint;
long initTime;
InMemoryPlan plan;
ReservationAgent mAgent;
Resource minAlloc;
ResourceCalculator res;
Resource maxAlloc;
int totCont = 1000000;
@Before
public void setup() throws Exception {
// 24h window
timeWindow = 86400000L;
// 1 sec step
step = 1000L;
// 25% avg cap on capacity
avgConstraint = 25;
// 70% instantaneous cap on capacity
instConstraint = 70;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
capConf.setReservationWindow(reservationQ, timeWindow);
capConf.setInstantaneousMaxCapacity(reservationQ, instConstraint);
capConf.setAverageCapacity(reservationQ, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, capConf);
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
"dedicated", null, true);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
}
@Test
public void testSimplePass() throws IOException, PlanningException {
// generate allocation that simply fit within all constraints
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testSimplePass2() throws IOException, PlanningException {
// generate allocation from single tenant that exceed avg momentarily but
// fit within
// max instantanesou
int[] f = generateData(3600, (int) Math.ceil(0.69 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = ResourceOverCommitException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = PlanningQuotaException.class)
public void testInstFail() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap single-show
int[] f = generateData(3600, (int) Math.ceil(0.71 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
Assert.fail("should not have accepted this");
}
@Test
public void testInstFailBySum() throws IOException, PlanningException {
// generate allocation that exceed the instantaneous cap by sum
int[] f = generateData(3600, (int) Math.ceil(0.3 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
Assert.fail();
} catch (PlanningQuotaException p) {
// expected
}
}
@Test(expected = PlanningQuotaException.class)
public void testFailAvg() throws IOException, PlanningException {
// generate an allocation which violates the 25% average single-shot
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
long win = timeWindow / 2 + 100;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
}
@Test
public void testFailAvgBySum() throws IOException, PlanningException {
// generate an allocation which violates the 25% average by sum
Map<ReservationInterval, ReservationRequest> req =
new TreeMap<ReservationInterval, ReservationRequest>();
long win = 86400000 / 4 + 1;
int cont = (int) Math.ceil(0.5 * totCont);
req.put(new ReservationInterval(initTime, initTime + win),
ReservationRequest.newInstance(Resource.newInstance(1024, 1), cont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
try {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + win, req, res, minAlloc)));
Assert.fail("should not have accepted this");
} catch (PlanningQuotaException e) {
// expected
}
}
}

View File

@ -0,0 +1,144 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import org.junit.Test;
public class TestNoOverCommitPolicy {
long step;
long initTime;
InMemoryPlan plan;
ReservationAgent mAgent;
Resource minAlloc;
ResourceCalculator res;
Resource maxAlloc;
int totCont = 1000000;
@Before
public void setup() throws Exception {
// 1 sec step
step = 1000L;
initTime = System.currentTimeMillis();
minAlloc = Resource.newInstance(1024, 1);
res = new DefaultResourceCalculator();
maxAlloc = Resource.newInstance(1024 * 8, 8);
mAgent = mock(ReservationAgent.class);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler scheduler = testUtil.mockCapacityScheduler(totCont);
String reservationQ = testUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration capConf = scheduler.getConfiguration();
NoOverCommitPolicy policy = new NoOverCommitPolicy();
policy.init(reservationQ, capConf);
plan =
new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
scheduler.getClusterResource(), step, res, minAlloc, maxAlloc,
"dedicated", null, true);
}
public int[] generateData(int length, int val) {
int[] data = new int[length];
for (int i = 0; i < length; i++) {
data[i] = val;
}
return data;
}
@Test
public void testSingleUserEasyFitPass() throws IOException, PlanningException {
// generate allocation that easily fit within resource constraints
int[] f = generateData(3600, (int) Math.ceil(0.2 * totCont));
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test
public void testSingleUserBarelyFitPass() throws IOException,
PlanningException {
// generate allocation from single tenant that barely fit
int[] f = generateData(3600, totCont);
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
@Test(expected = ResourceOverCommitException.class)
public void testSingleFail() throws IOException, PlanningException {
// generate allocation from single tenant that exceed capacity
int[] f = generateData(3600, (int) (1.1 * totCont));
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
}
@Test(expected = MismatchedUserException.class)
public void testUserMismatch() throws IOException, PlanningException {
// generate allocation from single tenant that exceed capacity
int[] f = generateData(3600, (int) (0.5 * totCont));
ReservationId rid = ReservationSystemTestUtil.getNewReservationId();
plan.addReservation(new InMemoryReservationAllocation(rid, null, "u1",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
// trying to update a reservation with a mismatching user
plan.updateReservation(new InMemoryReservationAllocation(rid, null, "u2",
"dedicated", initTime, initTime + f.length, ReservationSystemTestUtil
.generateAllocation(initTime, step, f), res, minAlloc));
}
@Test
public void testMultiTenantPass() throws IOException, PlanningException {
// generate allocation from multiple tenants that barely fit in tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 4; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
@Test(expected = ResourceOverCommitException.class)
public void testMultiTenantFail() throws IOException, PlanningException {
// generate allocation from multiple tenants that exceed tot capacity
int[] f = generateData(3600, (int) Math.ceil(0.25 * totCont));
for (int i = 0; i < 5; i++) {
assertTrue(plan.toString(),
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null, "u" + i,
"dedicated", initTime, initTime + f.length,
ReservationSystemTestUtil.generateAllocation(initTime, step, f),
res, minAlloc)));
}
}
}