YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino)

(cherry picked from commit 156f24ead0)
This commit is contained in:
ccurino 2015-07-25 07:39:47 -07:00
parent 621203bf44
commit 26ea045814
38 changed files with 2644 additions and 439 deletions

View File

@ -92,6 +92,9 @@ Release 2.8.0 - UNRELEASED
YARN-2019. Retrospect on decision of making RM crashed if any exception throw YARN-2019. Retrospect on decision of making RM crashed if any exception throw
in ZKRMStateStore. (Jian He via junping_du) in ZKRMStateStore. (Jian He via junping_du)
YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations.
(Jonathan Yaniv and Ishai Menache via curino)
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -40,6 +40,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;

View File

@ -1,390 +0,0 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This Agent employs a simple greedy placement strategy, placing the various
* stages of a {@link ReservationRequest} from the deadline moving backward
* towards the arrival. This allows jobs with earlier deadline to be scheduled
* greedily as well. Combined with an opportunistic anticipation of work if the
* cluster is not fully utilized also seems to provide good latency for
* best-effort jobs (i.e., jobs running without a reservation).
* This agent does not account for locality and only consider container
* granularity for validation purposes (i.e., you can't exceed max-container
* size).
public class GreedyReservationAgent implements ReservationAgent {
private static final Logger LOG = LoggerFactory
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract, null);
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
return computeAllocation(reservationId, user, plan, contract,
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
return plan.deleteReservation(reservationId);
private boolean computeAllocation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
LOG.info("placing the following ReservationRequest: " + contract);
Resource totalCapacity = plan.getTotalCapacity();
// Here we can addd logic to adjust the ResourceDefinition to account for
// system "imperfections" (e.g., scheduling delays for large containers).
// Align with plan step conservatively (i.e., ceil arrival, and floor
// deadline)
long earliestStart = contract.getArrival();
long step = plan.getStep();
if (earliestStart % step != 0) {
earliestStart = earliestStart + (step - (earliestStart % step));
long deadline =
contract.getDeadline() - contract.getDeadline() % plan.getStep();
// setup temporary variables to handle time-relations between stages and
// intermediate answers
long curDeadline = deadline;
long oldDeadline = -1;
Map<ReservationInterval, Resource> allocations =
new HashMap<ReservationInterval, Resource>();
RLESparseResourceAllocation tempAssigned =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
List<ReservationRequest> stages = contract.getReservationRequests()
ReservationRequestInterpreter type = contract.getReservationRequests()
boolean hasGang = false;
// Iterate the stages in backward from deadline
for (ListIterator<ReservationRequest> li =
stages.listIterator(stages.size()); li.hasPrevious();) {
ReservationRequest currentReservationStage = li.previous();
// validate the RR respect basic constraints
validateInput(plan, currentReservationStage, totalCapacity);
hasGang |= currentReservationStage.getConcurrency() > 1;
// run allocation for a single stage
Map<ReservationInterval, Resource> curAlloc =
placeSingleStage(plan, tempAssigned, currentReservationStage,
earliestStart, curDeadline, oldReservation, totalCapacity);
if (curAlloc == null) {
// if we did not find an allocation for the currentReservationStage
// return null, unless the ReservationDefinition we are placing is of
// type ANY
if (type != ReservationRequestInterpreter.R_ANY) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
} else {
} else {
// if we did find an allocation add it to the set of allocations
// if this request is of type ANY we are done searching (greedy)
// and can return the current allocation (break-out of the search)
if (type == ReservationRequestInterpreter.R_ANY) {
// if the request is of ORDER or ORDER_NO_GAP we constraint the next
// round of allocation to precede the current allocation, by setting
// curDeadline
if (type == ReservationRequestInterpreter.R_ORDER
|| type == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
curDeadline = findEarliestTime(curAlloc.keySet());
// for ORDER_NO_GAP verify that the allocation found so far has no
// gap, return null otherwise (the greedy procedure failed to find a
// no-gap
// allocation)
if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& oldDeadline > 0) {
if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan
.getStep()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
// keep the variable oldDeadline pointing to the last deadline we
// found
oldDeadline = curDeadline;
// / If we got here is because we failed to find an allocation for the
// ReservationDefinition give-up and report failure to the user
if (allocations.isEmpty()) {
throw new PlanningException("The GreedyAgent"
+ " couldn't find a valid allocation for your request");
// create reservation with above allocations if not null/empty
Resource ZERO_RES = Resource.newInstance(0, 0);
long firstStartTime = findEarliestTime(allocations.keySet());
// add zero-padding from arrival up to the first non-null allocation
// to guarantee that the reservation exists starting at arrival
if (firstStartTime > earliestStart) {
allocations.put(new ReservationInterval(earliestStart,
firstStartTime), ZERO_RES);
firstStartTime = earliestStart;
// consider to add trailing zeros at the end for simmetry
// Actually add/update the reservation in the plan.
// This is subject to validation as other agents might be placing
// in parallel and there might be sharing policies the agent is not
// aware off.
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, contract, user,
plan.getQueueName(), firstStartTime,
findLatestTime(allocations.keySet()), allocations,
plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang);
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
private void validateInput(Plan plan, ReservationRequest rr,
Resource totalCapacity) throws ContractValidationException {
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be >= 0");
// check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
// check that the largest container request does not exceed
// the cluster-wide limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException("Individual"
+ " capability requests should not exceed cluster's maxAlloc");
* This method actually perform the placement of an atomic stage of the
* reservation. The key idea is to traverse the plan backward for a
* "lease-duration" worth of time, and compute what is the maximum multiple of
* our concurrency (gang) parameter we can fit. We do this and move towards
* previous instant in time until the time-window is exhausted or we placed
* all the user request.
private Map<ReservationInterval, Resource> placeSingleStage(
Plan plan, RLESparseResourceAllocation tempAssigned,
ReservationRequest rr, long earliestStart, long curDeadline,
ReservationAllocation oldResAllocation, final Resource totalCapacity) {
Map<ReservationInterval, Resource> allocationRequests =
new HashMap<ReservationInterval, Resource>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = curDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// As we run along we will logically remove the previous allocation for
// this reservation
// if one existed
Resource oldResCap = Resource.newInstance(0, 0);
if (oldResAllocation != null) {
oldResCap = oldResAllocation.getResourcesAtTime(t);
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
Resources.addTo(netAvailableRes, oldResCap);
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(curDeadline - dur, curDeadline);
ReservationRequest reservationRequest =
rr.getConcurrency() * maxGang, rr.getConcurrency(),
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
final Resource reservationRes = ReservationSystemUtil.toResource(
tempAssigned.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
curDeadline = minPoint;
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation :
allocationRequests.entrySet()) {
// and return null to signal failure in this allocation
return null;
// finds the leftmost point of this set of ReservationInterval
private long findEarliestTime(Set<ReservationInterval> resInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : resInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
return ret;
// finds the rightmost point of this set of ReservationIntervals
private long findLatestTime(Set<ReservationInterval> resInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : resInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
return ret;

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
@ -41,7 +43,12 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
class InMemoryPlan implements Plan { /**
* This class represents an in memory representation of the state of our
* reservation system, and provides accelerated access to both individual
* reservations and aggregate utilization of resources over time.
public class InMemoryPlan implements Plan {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class);
@ -75,7 +82,7 @@ class InMemoryPlan implements Plan {
private Resource totalCapacity; private Resource totalCapacity;
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step, ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry) { String queueName, Planner replanner, boolean getMoveOnExpiry) {
@ -83,7 +90,7 @@ class InMemoryPlan implements Plan {
maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock());
} }
InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy,
ReservationAgent agent, Resource totalCapacity, long step, ReservationAgent agent, Resource totalCapacity, long step,
ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc,
String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) {

View File

@ -29,9 +29,9 @@
/** /**
* An in memory implementation of a reservation allocation using the * An in memory implementation of a reservation allocation using the
* {@link RLESparseResourceAllocation} * {@link RLESparseResourceAllocation}
* *
*/ */
class InMemoryReservationAllocation implements ReservationAllocation { public class InMemoryReservationAllocation implements ReservationAllocation {
private final String planName; private final String planName;
private final ReservationId reservationID; private final ReservationId reservationID;
@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
private RLESparseResourceAllocation resourcesOverTime; private RLESparseResourceAllocation resourcesOverTime;
InMemoryReservationAllocation(ReservationId reservationID, public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName, ReservationDefinition contract, String user, String planName,
long startTime, long endTime, long startTime, long endTime,
Map<ReservationInterval, Resource> allocations, Map<ReservationInterval, Resource> allocations,
@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation {
allocations, calculator, minAlloc, false); allocations, calculator, minAlloc, false);
} }
InMemoryReservationAllocation(ReservationId reservationID, public InMemoryReservationAllocation(ReservationId reservationID,
ReservationDefinition contract, String user, String planName, ReservationDefinition contract, String user, String planName,
long startTime, long endTime, long startTime, long endTime,
Map<ReservationInterval, Resource> allocations, Map<ReservationInterval, Resource> allocations,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/** /**
* A Plan represents the central data structure of a reservation system that * A Plan represents the central data structure of a reservation system that

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

View File

@ -1,26 +1,27 @@
/******************************************************************************* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*******************************************************************************/ */
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/** /**
* This interface provides a read-only view on the allocations made in this * This interface provides a read-only view on the allocations made in this

View File

@ -38,7 +38,7 @@
/** /**
* This is a run length encoded sparse data structure that maintains resource * This is a run length encoded sparse data structure that maintains resource
* allocations over time * allocations over time.
*/ */
public class RLESparseResourceAllocation { public class RLESparseResourceAllocation {
@ -74,7 +74,7 @@ private boolean isSameAsNext(Long key, Resource capacity) {
/** /**
* Add a resource for the specified interval * Add a resource for the specified interval
* *
* @param reservationInterval the interval for which the resource is to be * @param reservationInterval the interval for which the resource is to be
* added * added
* @param totCap the resource to be added * @param totCap the resource to be added
@ -138,7 +138,7 @@ public boolean addInterval(ReservationInterval reservationInterval,
/** /**
* Removes a resource for the specified interval * Removes a resource for the specified interval
* *
* @param reservationInterval the interval for which the resource is to be * @param reservationInterval the interval for which the resource is to be
* removed * removed
* @param totCap the resource to be removed * @param totCap the resource to be removed
@ -189,7 +189,7 @@ public boolean removeInterval(ReservationInterval reservationInterval,
/** /**
* Returns the capacity, i.e. total resources allocated at the specified point * Returns the capacity, i.e. total resources allocated at the specified point
* of time * of time
* *
* @param tick the time (UTC in ms) at which the capacity is requested * @param tick the time (UTC in ms) at which the capacity is requested
* @return the resources allocated at the specified time * @return the resources allocated at the specified time
*/ */
@ -208,7 +208,7 @@ public Resource getCapacityAtTime(long tick) {
/** /**
* Get the timestamp of the earliest resource allocation * Get the timestamp of the earliest resource allocation
* *
* @return the timestamp of the first resource allocation * @return the timestamp of the first resource allocation
*/ */
public long getEarliestStartTime() { public long getEarliestStartTime() {
@ -226,7 +226,7 @@ public long getEarliestStartTime() {
/** /**
* Get the timestamp of the latest resource allocation * Get the timestamp of the latest resource allocation
* *
* @return the timestamp of the last resource allocation * @return the timestamp of the last resource allocation
*/ */
public long getLatestEndTime() { public long getLatestEndTime() {
@ -244,7 +244,7 @@ public long getLatestEndTime() {
/** /**
* Returns true if there are no non-zero entries * Returns true if there are no non-zero entries
* *
* @return true if there are no allocations or false otherwise * @return true if there are no allocations or false otherwise
*/ */
public boolean isEmpty() { public boolean isEmpty() {
@ -287,7 +287,7 @@ public String toString() {
/** /**
* Returns the JSON string representation of the current resources allocated * Returns the JSON string representation of the current resources allocated
* over time * over time
* *
* @return the JSON string representation of the current resources allocated * @return the JSON string representation of the current resources allocated
* over time * over time
*/ */
@ -312,4 +312,43 @@ public String toMemJSONString() {
} }
} }
* Returns the representation of the current resources allocated over time as
* an interval map.
* @return the representation of the current resources allocated over time as
* an interval map.
public Map<ReservationInterval, Resource> toIntervalMap() {
try {
Map<ReservationInterval, Resource> allocations =
new TreeMap<ReservationInterval, Resource>();
// Empty
if (isEmpty()) {
return allocations;
Map.Entry<Long, Resource> lastEntry = null;
for (Map.Entry<Long, Resource> entry : cumulativeCapacity.entrySet()) {
if (lastEntry != null) {
ReservationInterval interval =
new ReservationInterval(lastEntry.getKey(), entry.getKey());
Resource resource = lastEntry.getValue();
allocations.put(interval, resource);
lastEntry = entry;
return allocations;
} finally {
} }

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
public abstract class ReservationSchedulerConfiguration extends Configuration { public abstract class ReservationSchedulerConfiguration extends Configuration {
@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_AGENT_NAME = public static final String DEFAULT_RESERVATION_AGENT_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy";
@InterfaceAudience.Private @InterfaceAudience.Private
public static final String DEFAULT_RESERVATION_PLANNER_NAME = public static final String DEFAULT_RESERVATION_PLANNER_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner";
@InterfaceAudience.Private @InterfaceAudience.Private
public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true;

View File

@ -24,12 +24,13 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/** /**
* This interface is the one implemented by any system that wants to support * This interface is the one implemented by any system that wants to support

View File

@ -25,7 +25,11 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
final class ReservationSystemUtil { /**
* Simple helper class for static methods used to transform across
* common formats in tests
public final class ReservationSystemUtil {
private ReservationSystemUtil() { private ReservationSystemUtil() {
// not called // not called

View File

@ -0,0 +1,123 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A planning algorithm that first runs LowCostAligned, and if it fails runs
* Greedy.
public class AlignedPlannerWithGreedy implements ReservationAgent {
// Default smoothness factor
private static final int DEFAULT_SMOOTHNESS_FACTOR = 10;
// Log
private static final Logger LOG = LoggerFactory
// Smoothness factor
private final ReservationAgent planner;
// Constructor
public AlignedPlannerWithGreedy() {
// Constructor
public AlignedPlannerWithGreedy(int smoothnessFactor) {
// List of algorithms
List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>();
// LowCostAligned planning algorithm
ReservationAgent algAligned =
new IterativePlanner(new StageEarliestStartByDemand(),
new StageAllocatorLowCostAligned(smoothnessFactor));
// Greedy planning algorithm
ReservationAgent algGreedy =
new IterativePlanner(new StageEarliestStartByJobArrival(),
new StageAllocatorGreedy());
// Set planner:
// 1. Attempt to execute algAligned
// 2. If failed, fall back to algGreedy
planner = new TryManyReservationAgents(listAlg);
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
LOG.info("placing the following ReservationRequest: " + contract);
try {
boolean res =
planner.createReservation(reservationId, user, plan, contract);
if (res) {
LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ reservationId.toString() + ", Contract: " + contract.toString());
} else {
LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ reservationId.toString() + ", Contract: " + contract.toString());
return res;
} catch (PlanningException e) {
LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ ", Contract: " + contract.toString());
throw e;
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
LOG.info("updating the following ReservationRequest: " + contract);
return planner.updateReservation(reservationId, user, plan, contract);
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
LOG.info("removing the following ReservationId: " + reservationId);
return planner.deleteReservation(reservationId, user, plan);

View File

@ -0,0 +1,97 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This Agent employs a simple greedy placement strategy, placing the various
* stages of a {@link ReservationDefinition} from the deadline moving backward
* towards the arrival. This allows jobs with earlier deadline to be scheduled
* greedily as well. Combined with an opportunistic anticipation of work if the
* cluster is not fully utilized also seems to provide good latency for
* best-effort jobs (i.e., jobs running without a reservation).
* This agent does not account for locality and only consider container
* granularity for validation purposes (i.e., you can't exceed max-container
* size).
public class GreedyReservationAgent implements ReservationAgent {
// Log
private static final Logger LOG = LoggerFactory
// Greedy planner
private final ReservationAgent planner = new IterativePlanner(
new StageEarliestStartByJobArrival(), new StageAllocatorGreedy());
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
LOG.info("placing the following ReservationRequest: " + contract);
try {
boolean res =
planner.createReservation(reservationId, user, plan, contract);
if (res) {
LOG.info("OUTCOME: SUCCESS, Reservation ID: "
+ reservationId.toString() + ", Contract: " + contract.toString());
} else {
LOG.info("OUTCOME: FAILURE, Reservation ID: "
+ reservationId.toString() + ", Contract: " + contract.toString());
return res;
} catch (PlanningException e) {
LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString()
+ ", Contract: " + contract.toString());
throw e;
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
LOG.info("updating the following ReservationRequest: " + contract);
return planner.updateReservation(reservationId, user, plan, contract);
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
LOG.info("removing the following ReservationId: " + reservationId);
return planner.deleteReservation(reservationId, user, plan);

View File

@ -0,0 +1,338 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.util.resource.Resources;
* A planning algorithm consisting of two main phases. The algorithm iterates
* over the job stages in descending order. For each stage, the algorithm: 1.
* Determines an interval [stageArrivalTime, stageDeadline) in which the stage
* is allocated. 2. Computes an allocation for the stage inside the interval.
* For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be
* [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of
* each stage is set as succcessorStartTime - the starting time of its
* succeeding stage (or jobDeadline if it is the last stage).
* The phases are set using the two functions: 1. setAlgEarliestStartTime 2.
* setAlgComputeStageAllocation
public class IterativePlanner extends PlanningAlgorithm {
// Modifications performed by the algorithm that are not been reflected in the
// actual plan while a request is still pending.
private RLESparseResourceAllocation planModifications;
// Data extracted from plan
private Map<Long, Resource> planLoads;
private Resource capacity;
private long step;
// Job parameters
private ReservationRequestInterpreter jobType;
private long jobArrival;
private long jobDeadline;
// Phase algorithms
private StageEarliestStart algStageEarliestStart = null;
private StageAllocator algStageAllocator = null;
// Constructor
public IterativePlanner(StageEarliestStart algEarliestStartTime,
StageAllocator algStageAllocator) {
public RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation)
throws ContractValidationException {
// Initialize
initialize(plan, reservation);
// If the job has been previously reserved, logically remove its allocation
ReservationAllocation oldReservation =
if (oldReservation != null) {
// Create the allocations data structure
RLESparseResourceAllocation allocations =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
// Get a reverse iterator for the set of stages
ListIterator<ReservationRequest> li =
// Current stage
ReservationRequest currentReservationStage;
// Index, points on the current node
int index =
// Stage deadlines
long stageDeadline = stepRoundDown(reservation.getDeadline(), step);
long successorStartingTime = -1;
// Iterate the stages in reverse order
while (li.hasPrevious()) {
// Get current stage
currentReservationStage = li.previous();
index -= 1;
// Validate that the ReservationRequest respects basic constraints
validateInputStage(plan, currentReservationStage);
// Compute an adjusted earliestStart for this resource
// (we need this to provision some space for the ORDER contracts)
long stageArrivalTime = reservation.getArrival();
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
stageArrivalTime =
computeEarliestStartingTime(plan, reservation, index,
currentReservationStage, stageDeadline);
stageArrivalTime = stepRoundUp(stageArrivalTime, step);
stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival());
// Compute the allocation of a single stage
Map<ReservationInterval, Resource> curAlloc =
computeStageAllocation(plan, currentReservationStage,
stageArrivalTime, stageDeadline);
// If we did not find an allocation, return NULL
// (unless it's an ANY job, then we simply continue).
if (curAlloc == null) {
// If it's an ANY job, we can move to the next possible request
if (jobType == ReservationRequestInterpreter.R_ANY) {
// Otherwise, the job cannot be allocated
return null;
// Get the start & end time of the current allocation
Long stageStartTime = findEarliestTime(curAlloc.keySet());
Long stageEndTime = findLatestTime(curAlloc.keySet());
// If we did find an allocation for the stage, add it
for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) {
allocations.addInterval(entry.getKey(), entry.getValue());
// If this is an ANY clause, we have finished
if (jobType == ReservationRequestInterpreter.R_ANY) {
// If ORDER job, set the stageDeadline of the next stage to be processed
if (jobType == ReservationRequestInterpreter.R_ORDER
|| jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) {
// Verify that there is no gap, in case the job is ORDER_NO_GAP
if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP
&& successorStartingTime != -1
&& successorStartingTime > stageEndTime) {
return null;
// Store the stageStartTime and set the new stageDeadline
successorStartingTime = stageStartTime;
stageDeadline = stageStartTime;
// If the allocation is empty, return an error
if (allocations.isEmpty()) {
return null;
return allocations;
protected void initialize(Plan plan, ReservationDefinition reservation) {
// Get plan step & capacity
capacity = plan.getTotalCapacity();
step = plan.getStep();
// Get job parameters (type, arrival time & deadline)
jobType = reservation.getReservationRequests().getInterpreter();
jobArrival = stepRoundUp(reservation.getArrival(), step);
jobDeadline = stepRoundDown(reservation.getDeadline(), step);
// Dirty read of plan load
planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline);
// Initialize the plan modifications
planModifications =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime,
long endTime) {
// Create map
Map<Long, Resource> loads = new HashMap<Long, Resource>();
// Calculate the load for every time slot between [start,end)
for (long t = startTime; t < endTime; t += step) {
Resource load = plan.getTotalCommittedResources(t);
loads.put(t, load);
// Return map
return loads;
private void ignoreOldAllocation(ReservationAllocation oldReservation) {
// If there is no old reservation, return
if (oldReservation == null) {
// Subtract each allocation interval from the planModifications
for (Entry<ReservationInterval, Resource> entry : oldReservation
.getAllocationRequests().entrySet()) {
// Read the entry
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
// Find the actual request
Resource negativeResource = Resources.multiply(resource, -1);
// Insert it into planModifications as a 'negative' request, to
// represent available resources
planModifications.addInterval(interval, negativeResource);
private void validateInputStage(Plan plan, ReservationRequest rr)
throws ContractValidationException {
// Validate concurrency
if (rr.getConcurrency() < 1) {
throw new ContractValidationException("Gang Size should be >= 1");
// Validate number of containers
if (rr.getNumContainers() <= 0) {
throw new ContractValidationException("Num containers should be > 0");
// Check that gangSize and numContainers are compatible
if (rr.getNumContainers() % rr.getConcurrency() != 0) {
throw new ContractValidationException(
"Parallelism must be an exact multiple of gang size");
// Check that the largest container request does not exceed the cluster-wide
// limit for container sizes
if (Resources.greaterThan(plan.getResourceCalculator(), capacity,
rr.getCapability(), plan.getMaximumAllocation())) {
throw new ContractValidationException(
"Individual capability requests should not exceed cluster's " +
// Call algEarliestStartTime()
protected long computeEarliestStartingTime(Plan plan,
ReservationDefinition reservation, int index,
ReservationRequest currentReservationStage, long stageDeadline) {
return algStageEarliestStart.setEarliestStartTime(plan, reservation, index,
currentReservationStage, stageDeadline);
// Call algStageAllocator
protected Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, ReservationRequest rr, long stageArrivalTime,
long stageDeadline) {
return algStageAllocator.computeStageAllocation(plan, planLoads,
planModifications, rr, stageArrivalTime, stageDeadline);
// Set the algorithm: algStageEarliestStart
public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) {
this.algStageEarliestStart = alg;
return this; // To allow concatenation of setAlg() functions
// Set the algorithm: algStageAllocator
public IterativePlanner setAlgStageAllocator(StageAllocator alg) {
this.algStageAllocator = alg;
return this; // To allow concatenation of setAlg() functions

View File

@ -16,11 +16,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
public interface Planner { public interface Planner {

View File

@ -0,0 +1,207 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
* An abstract class that follows the general behavior of planning algorithms.
public abstract class PlanningAlgorithm implements ReservationAgent {
* Performs the actual allocation for a ReservationDefinition within a Plan.
* @param reservationId the identifier of the reservation
* @param user the user who owns the reservation
* @param plan the Plan to which the reservation must be fitted
* @param contract encapsulates the resources required by the user for his
* session
* @param oldReservation the existing reservation (null if none)
* @return whether the allocateUser function was successful or not
* @throws PlanningException if the session cannot be fitted into the plan
* @throws ContractValidationException
protected boolean allocateUser(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract,
ReservationAllocation oldReservation) throws PlanningException,
ContractValidationException {
// Adjust the ResourceDefinition to account for system "imperfections"
// (e.g., scheduling delays for large containers).
ReservationDefinition adjustedContract = adjustContract(plan, contract);
// Compute the job allocation
RLESparseResourceAllocation allocation =
computeJobAllocation(plan, reservationId, adjustedContract);
// If no job allocation was found, fail
if (allocation == null) {
throw new PlanningException(
"The planning algorithm could not find a valid allocation"
+ " for your request");
// Translate the allocation to a map (with zero paddings)
long step = plan.getStep();
long jobArrival = stepRoundUp(adjustedContract.getArrival(), step);
long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step);
Map<ReservationInterval, Resource> mapAllocations =
allocationsToPaddedMap(allocation, jobArrival, jobDeadline);
// Create the reservation
ReservationAllocation capReservation =
new InMemoryReservationAllocation(reservationId, // ID
adjustedContract, // Contract
user, // User name
plan.getQueueName(), // Queue name
findEarliestTime(mapAllocations.keySet()), // Earliest start time
findLatestTime(mapAllocations.keySet()), // Latest end time
mapAllocations, // Allocations
plan.getResourceCalculator(), // Resource calculator
plan.getMinimumAllocation()); // Minimum allocation
// Add (or update) the reservation allocation
if (oldReservation != null) {
return plan.updateReservation(capReservation);
} else {
return plan.addReservation(capReservation);
private Map<ReservationInterval, Resource>
allocationsToPaddedMap(RLESparseResourceAllocation allocation,
long jobArrival, long jobDeadline) {
// Allocate
Map<ReservationInterval, Resource> mapAllocations =
// Zero allocation
Resource zeroResource = Resource.newInstance(0, 0);
// Pad at the beginning
long earliestStart = findEarliestTime(mapAllocations.keySet());
if (jobArrival < earliestStart) {
mapAllocations.put(new ReservationInterval(jobArrival, earliestStart),
// Pad at the beginning
long latestEnd = findLatestTime(mapAllocations.keySet());
if (latestEnd < jobDeadline) {
mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline),
return mapAllocations;
public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan,
ReservationId reservationId, ReservationDefinition reservation)
throws PlanningException, ContractValidationException;
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Allocate
return allocateUser(reservationId, user, plan, contract, null);
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Get the old allocation
ReservationAllocation oldAlloc = plan.getReservationById(reservationId);
// Allocate (ignores the old allocation)
return allocateUser(reservationId, user, plan, contract, oldAlloc);
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
// Delete the existing reservation
return plan.deleteReservation(reservationId);
protected static long findEarliestTime(Set<ReservationInterval> sesInt) {
long ret = Long.MAX_VALUE;
for (ReservationInterval s : sesInt) {
if (s.getStartTime() < ret) {
ret = s.getStartTime();
return ret;
protected static long findLatestTime(Set<ReservationInterval> sesInt) {
long ret = Long.MIN_VALUE;
for (ReservationInterval s : sesInt) {
if (s.getEndTime() > ret) {
ret = s.getEndTime();
return ret;
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;
private ReservationDefinition adjustContract(Plan plan,
ReservationDefinition originalContract) {
// Place here adjustment. For example using QueueMetrics we can track
// large container delays per YARN-YARN-1990
return originalContract;

View File

@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
/** /**

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -27,6 +27,9 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
@ -87,8 +90,9 @@ public void plan(Plan plan, List<ReservationDefinition> contracts)
// loop on all moment in time from now to the end of the check Zone // loop on all moment in time from now to the end of the check Zone
// or the end of the planned sessions whichever comes first // or the end of the planned sessions whichever comes first
for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += for (long t = now;
plan.getStep()) { (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone));
t += plan.getStep()) {
Resource excessCap = Resource excessCap =
Resources.subtract(plan.getTotalCommittedResources(t), totCap); Resources.subtract(plan.getTotalCommittedResources(t), totCap);
// if we are violating // if we are violating
@ -98,7 +102,8 @@ public void plan(Plan plan, List<ReservationDefinition> contracts)
new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t)); new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t));
for (Iterator<ReservationAllocation> resIter = for (Iterator<ReservationAllocation> resIter =
curReservations.iterator(); resIter.hasNext() curReservations.iterator(); resIter.hasNext()
&& Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { && Resources.greaterThan(resCalc, totCap, excessCap,
ReservationAllocation reservation = resIter.next(); ReservationAllocation reservation = resIter.next();
plan.deleteReservation(reservation.getReservationId()); plan.deleteReservation(reservation.getReservationId());
excessCap = excessCap =

View File

@ -0,0 +1,55 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
* Interface for allocating a single stage in IterativePlanner.
public interface StageAllocator {
* Computes the allocation of a stage inside a defined time interval.
* @param plan the Plan to which the reservation must be fitted
* @param planLoads a 'dirty' read of the plan loads at each time
* @param planModifications the allocations performed by the planning
* algorithm which are not yet reflected by plan
* @param rr the stage
* @param stageEarliestStart the arrival time (earliest starting time) set for
* the stage by the two phase planning algorithm
* @param stageDeadline the deadline of the stage set by the two phase
* planning algorithm
* @return The computed allocation (or null if the stage could not be
* allocated)
Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline);

View File

@ -0,0 +1,152 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.Resources;
* Computes the stage allocation according to the greedy allocation rule. The
* greedy rule repeatedly allocates requested containers at the rightmost
* (latest) free interval.
public class StageAllocatorGreedy implements StageAllocator {
public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline) {
Resource totalCapacity = plan.getTotalCapacity();
Map<ReservationInterval, Resource> allocationRequests =
new HashMap<ReservationInterval, Resource>();
// compute the gang as a resource and get the duration
Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency());
long dur = rr.getDuration();
long step = plan.getStep();
// ceil the duration to the next multiple of the plan step
if (dur % step != 0) {
dur += (step - (dur % step));
// we know for sure that this division has no remainder (part of contract
// with user, validate before
int gangsToPlace = rr.getNumContainers() / rr.getConcurrency();
int maxGang = 0;
// loop trying to place until we are done, or we are considering
// an invalid range of times
while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) {
// as we run along we remember how many gangs we can fit, and what
// was the most constraining moment in time (we will restart just
// after that to place the next batch)
maxGang = gangsToPlace;
long minPoint = stageDeadline;
int curMaxGang = maxGang;
// start placing at deadline (excluded due to [,) interval semantics and
// move backward
for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur
&& maxGang > 0; t = t - plan.getStep()) {
// compute net available resources
Resource netAvailableRes = Resources.clone(totalCapacity);
// Resources.addTo(netAvailableRes, oldResCap);
// compute maximum number of gangs we could fit
curMaxGang =
(int) Math.floor(Resources.divide(plan.getResourceCalculator(),
totalCapacity, netAvailableRes, gang));
// pick the minimum between available resources in this instant, and how
// many gangs we have to place
curMaxGang = Math.min(gangsToPlace, curMaxGang);
// compare with previous max, and set it. also remember *where* we found
// the minimum (useful for next attempts)
if (curMaxGang <= maxGang) {
maxGang = curMaxGang;
minPoint = t;
// if we were able to place any gang, record this, and decrement
// gangsToPlace
if (maxGang > 0) {
gangsToPlace -= maxGang;
ReservationInterval reservationInt =
new ReservationInterval(stageDeadline - dur, stageDeadline);
Resource reservationRes =
Resources.multiply(rr.getCapability(), rr.getConcurrency()
* maxGang);
// remember occupied space (plan is read-only till we find a plausible
// allocation for the entire request). This is needed since we might be
// placing other ReservationRequest within the same
// ReservationDefinition,
// and we must avoid double-counting the available resources
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.put(reservationInt, reservationRes);
// reset our new starting point (curDeadline) to the most constraining
// point so far, we will look "left" of that to find more places where
// to schedule gangs (for sure nothing on the "right" of this point can
// fit a full gang.
stageDeadline = minPoint;
// if no gangs are left to place we succeed and return the allocation
if (gangsToPlace == 0) {
return allocationRequests;
} else {
// If we are here is becasue we did not manage to satisfy this request.
// So we need to remove unwanted side-effect from tempAssigned (needed
// for ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation
: allocationRequests.entrySet()) {
// and return null to signal failure in this allocation
return null;

View File

@ -0,0 +1,360 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
* A stage allocator that iteratively allocates containers in the
* {@link DurationInterval} with lowest overall cost. The algorithm only
* considers intervals of the form: [stageDeadline - (n+1)*duration,
* stageDeadline - n*duration) for an integer n. This guarantees that the
* allocations are aligned (as opposed to overlapping duration intervals).
* The smoothnessFactor parameter controls the number of containers that are
* simultaneously allocated in each iteration of the algorithm.
public class StageAllocatorLowCostAligned implements StageAllocator {
// Smoothness factor
private int smoothnessFactor = 10;
// Constructor
public StageAllocatorLowCostAligned() {
// Constructor
public StageAllocatorLowCostAligned(int smoothnessFactor) {
this.smoothnessFactor = smoothnessFactor;
// computeJobAllocation()
public Map<ReservationInterval, Resource> computeStageAllocation(
Plan plan, Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, ReservationRequest rr,
long stageEarliestStart, long stageDeadline) {
// Initialize
ResourceCalculator resCalc = plan.getResourceCalculator();
Resource capacity = plan.getTotalCapacity();
long step = plan.getStep();
// Create allocationRequestsearlies
RLESparseResourceAllocation allocationRequests =
new RLESparseResourceAllocation(plan.getResourceCalculator(),
// Initialize parameters
long duration = stepRoundUp(rr.getDuration(), step);
int windowSizeInDurations =
(int) ((stageDeadline - stageEarliestStart) / duration);
int totalGangs = rr.getNumContainers() / rr.getConcurrency();
int numContainersPerGang = rr.getConcurrency();
Resource gang =
Resources.multiply(rr.getCapability(), numContainersPerGang);
// Set maxGangsPerUnit
int maxGangsPerUnit =
(int) Math.max(
Math.floor(((double) totalGangs) / windowSizeInDurations), 1);
maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1);
// If window size is too small, return null
if (windowSizeInDurations <= 0) {
return null;
// Initialize tree sorted by costs
TreeSet<DurationInterval> durationIntervalsSortedByCost =
new TreeSet<DurationInterval>(new Comparator<DurationInterval>() {
public int compare(DurationInterval val1, DurationInterval val2) {
int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost());
if (cmp != 0) {
return cmp;
return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime());
// Add durationIntervals that end at (endTime - n*duration) for some n.
for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart
+ duration; intervalEnd -= duration) {
long intervalStart = intervalEnd - duration;
// Get duration interval [intervalStart,intervalEnd)
DurationInterval durationInterval =
getDurationInterval(intervalStart, intervalEnd, planLoads,
planModifications, capacity, resCalc, step);
// If the interval can fit a gang, add it to the tree
if (durationInterval.canAllocate(gang, capacity, resCalc)) {
// Allocate
int remainingGangs = totalGangs;
while (remainingGangs > 0) {
// If no durationInterval can fit a gang, break and return null
if (durationIntervalsSortedByCost.isEmpty()) {
// Get best duration interval
DurationInterval bestDurationInterval =
int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs);
// Add it
remainingGangs -= numGangsToAllocate;
ReservationInterval reservationInt =
new ReservationInterval(bestDurationInterval.getStartTime(),
Resource reservationRes =
Resources.multiply(rr.getCapability(), rr.getConcurrency()
* numGangsToAllocate);
planModifications.addInterval(reservationInt, reservationRes);
allocationRequests.addInterval(reservationInt, reservationRes);
// Remove from tree
// Get updated interval
DurationInterval updatedDurationInterval =
bestDurationInterval.getStartTime() + duration, planLoads,
planModifications, capacity, resCalc, step);
// Add to tree, if possible
if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) {
// Get the final allocation
Map<ReservationInterval, Resource> allocations =
// If no gangs are left to place we succeed and return the allocation
if (remainingGangs <= 0) {
return allocations;
} else {
// If we are here is because we did not manage to satisfy this request.
// We remove unwanted side-effect from planModifications (needed for ANY).
for (Map.Entry<ReservationInterval, Resource> tempAllocation
: allocations.entrySet()) {
// Return null to signal failure in this allocation
return null;
protected DurationInterval getDurationInterval(long startTime, long endTime,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) {
// Initialize the dominant loads structure
Resource dominantResources = Resource.newInstance(0, 0);
// Calculate totalCost and maxLoad
double totalCost = 0.0;
for (long t = startTime; t < endTime; t += step) {
// Get the load
Resource load = getLoadAtTime(t, planLoads, planModifications);
// Increase the total cost
totalCost += calcCostOfLoad(load, capacity, resCalc);
// Update the dominant resources
dominantResources = Resources.componentwiseMax(dominantResources, load);
// Return the corresponding durationInterval
return new DurationInterval(startTime, endTime, totalCost,
protected double calcCostOfInterval(long startTime, long endTime,
Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc, long step) {
// Sum costs in the interval [startTime,endTime)
double totalCost = 0.0;
for (long t = startTime; t < endTime; t += step) {
totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity,
// Return sum
return totalCost;
protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications, Resource capacity,
ResourceCalculator resCalc) {
// Get the current load at time t
Resource load = getLoadAtTime(t, planLoads, planModifications);
// Return cost
return calcCostOfLoad(load, capacity, resCalc);
protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads,
RLESparseResourceAllocation planModifications) {
Resource planLoad = planLoads.get(t);
planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad;
return Resources.add(planLoad, planModifications.getCapacityAtTime(t));
protected double calcCostOfLoad(Resource load, Resource capacity,
ResourceCalculator resCalc) {
return resCalc.ratio(load, capacity);
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;
* An inner class that represents an interval, typically of length duration.
* The class holds the total cost of the interval and the maximal load inside
* the interval in each dimension (both calculated externally).
protected static class DurationInterval {
private long startTime;
private long endTime;
private double cost;
private Resource maxLoad;
// Constructor
public DurationInterval(long startTime, long endTime, double cost,
Resource maxLoad) {
this.startTime = startTime;
this.endTime = endTime;
this.cost = cost;
this.maxLoad = maxLoad;
// canAllocate() - boolean function, returns whether requestedResources
// can be allocated during the durationInterval without
// violating capacity constraints
public boolean canAllocate(Resource requestedResources, Resource capacity,
ResourceCalculator resCalc) {
Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources);
return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0);
// numCanFit() - returns the maximal number of requestedResources can be
// allocated during the durationInterval without violating
// capacity constraints
public int numCanFit(Resource requestedResources, Resource capacity,
ResourceCalculator resCalc) {
// Represents the largest resource demand that can be satisfied throughout
// the entire DurationInterval (i.e., during [startTime,endTime))
Resource availableResources = Resources.subtract(capacity, maxLoad);
// Maximal number of requestedResources that fit inside the interval
return (int) Math.floor(Resources.divide(resCalc, capacity,
availableResources, requestedResources));
public long getStartTime() {
return this.startTime;
public void setStartTime(long value) {
this.startTime = value;
public long getEndTime() {
return this.endTime;
public void setEndTime(long value) {
this.endTime = value;
public Resource getMaxLoad() {
return this.maxLoad;
public void setMaxLoad(Resource value) {
this.maxLoad = value;
public double getTotalCost() {
return this.cost;
public void setTotalCost(double value) {
this.cost = value;

View File

@ -0,0 +1,46 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
* Interface for setting the earliest start time of a stage in IterativePlanner.
public interface StageEarliestStart {
* Computes the earliest allowed starting time for a given stage.
* @param plan the Plan to which the reservation must be fitted
* @param reservation the job contract
* @param index the index of the stage in the job contract
* @param currentReservationStage the stage
* @param stageDeadline the deadline of the stage set by the two phase
* planning algorithm
* @return the earliest allowed starting time for the stage.
long setEarliestStartTime(Plan plan, ReservationDefinition reservation,
int index, ReservationRequest currentReservationStage,
long stageDeadline);

View File

@ -0,0 +1,106 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.ListIterator;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
* Sets the earliest start time of a stage proportional to the job weight. The
* interval [jobArrival, stageDeadline) is divided as follows. First, each stage
* is guaranteed at least its requested duration. Then, the stage receives a
* fraction of the remaining time. The fraction is calculated as the ratio
* between the weight (total requested resources) of the stage and the total
* weight of all proceeding stages.
public class StageEarliestStartByDemand implements StageEarliestStart {
private long step;
public long setEarliestStartTime(Plan plan,
ReservationDefinition reservation, int index, ReservationRequest current,
long stageDeadline) {
step = plan.getStep();
// If this is the first stage, don't bother with the computation.
if (index < 1) {
return reservation.getArrival();
// Get iterator
ListIterator<ReservationRequest> li =
ReservationRequest rr;
// Calculate the total weight & total duration
double totalWeight = calcWeight(current);
long totalDuration = getRoundedDuration(current, plan);
while (li.hasPrevious()) {
rr = li.previous();
totalWeight += calcWeight(rr);
totalDuration += getRoundedDuration(rr, plan);
// Compute the weight of the current stage as compared to remaining ones
double ratio = calcWeight(current) / totalWeight;
// Estimate an early start time, such that:
// 1. Every stage is guaranteed to receive at least its duration
// 2. The remainder of the window is divided between stages
// proportionally to its workload (total memory consumption)
long window = stageDeadline - reservation.getArrival();
long windowRemainder = window - totalDuration;
long earlyStart =
(long) (stageDeadline - getRoundedDuration(current, plan)
- (windowRemainder * ratio));
// Realign if necessary (since we did some arithmetic)
earlyStart = stepRoundUp(earlyStart, step);
// Return
return earlyStart;
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemory())
* (stage.getNumContainers());
protected long getRoundedDuration(ReservationRequest stage, Plan plan) {
return stepRoundUp(stage.getDuration(), step);
protected static long stepRoundDown(long t, long step) {
return (t / step) * step;
protected static long stepRoundUp(long t, long step) {
return ((t + step - 1) / step) * step;

View File

@ -0,0 +1,39 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
* Sets the earliest start time of a stage as the job arrival time.
public class StageEarliestStartByJobArrival implements StageEarliestStart {
public long setEarliestStartTime(Plan plan,
ReservationDefinition reservation, int index, ReservationRequest current,
long stageDeadline) {
return reservation.getArrival();

View File

@ -0,0 +1,114 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
* A planning algorithm that invokes several other planning algorithms according
* to a given order. If one of the planners succeeds, the allocation it
* generates is returned.
public class TryManyReservationAgents implements ReservationAgent {
// Planning algorithms
private final List<ReservationAgent> algs;
// Constructor
public TryManyReservationAgents(List<ReservationAgent> algs) {
this.algs = new LinkedList<ReservationAgent>(algs);
public boolean createReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Save the planning exception
PlanningException planningException = null;
// Try all of the algorithms, in order
for (ReservationAgent alg : algs) {
try {
if (alg.createReservation(reservationId, user, plan, contract)) {
return true;
} catch (PlanningException e) {
planningException = e;
// If all of the algorithms failed and one of the algorithms threw an
// exception, throw the last planning exception
if (planningException != null) {
throw planningException;
// If all of the algorithms failed, return false
return false;
public boolean updateReservation(ReservationId reservationId, String user,
Plan plan, ReservationDefinition contract) throws PlanningException {
// Save the planning exception
PlanningException planningException = null;
// Try all of the algorithms, in order
for (ReservationAgent alg : algs) {
try {
if (alg.updateReservation(reservationId, user, plan, contract)) {
return true;
} catch (PlanningException e) {
planningException = e;
// If all of the algorithms failed and one of the algorithms threw an
// exception, throw the last planning exception
if (planningException != null) {
throw planningException;
// If all of the algorithms failed, return false
return false;
public boolean deleteReservation(ReservationId reservationId, String user,
Plan plan) throws PlanningException {
return plan.deleteReservation(reservationId);

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -89,7 +90,7 @@ public static void validateReservationQueue(
Assert.assertEquals(planQName, plan.getQueueName()); Assert.assertEquals(planQName, plan.getQueueName());
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert.assertTrue( Assert.assertTrue(
plan.getReservationAgent() instanceof GreedyReservationAgent); plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert.assertTrue( Assert.assertTrue(
plan.getSharingPolicy() instanceof CapacityOverTimePolicy); plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }
@ -102,7 +103,7 @@ public static void validateNewReservationQueue(
Assert.assertEquals(newQ, newPlan.getQueueName()); Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory()); Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert Assert
.assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent); .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -164,6 +164,53 @@ public void testZeroAlloaction() {
Assert.assertTrue(rleSparseVector.isEmpty()); Assert.assertTrue(rleSparseVector.isEmpty());
} }
public void testToIntervalMap() {
ResourceCalculator resCalc = new DefaultResourceCalculator();
Resource minAlloc = Resource.newInstance(1, 1);
RLESparseResourceAllocation rleSparseVector =
new RLESparseResourceAllocation(resCalc, minAlloc);
Map<ReservationInterval, Resource> mapAllocations;
// Check empty
mapAllocations = rleSparseVector.toIntervalMap();
// Check full
int[] alloc = { 0, 5, 10, 10, 5, 0, 5, 0 };
int start = 100;
Set<Entry<ReservationInterval, Resource>> inputs =
generateAllocation(start, alloc, false).entrySet();
for (Entry<ReservationInterval, Resource> ip : inputs) {
rleSparseVector.addInterval(ip.getKey(), ip.getValue());
mapAllocations = rleSparseVector.toIntervalMap();
Assert.assertTrue(mapAllocations.size() == 5);
for (Entry<ReservationInterval, Resource> entry : mapAllocations
.entrySet()) {
ReservationInterval interval = entry.getKey();
Resource resource = entry.getValue();
if (interval.getStartTime() == 101L) {
Assert.assertTrue(interval.getEndTime() == 102L);
Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
} else if (interval.getStartTime() == 102L) {
Assert.assertTrue(interval.getEndTime() == 104L);
Assert.assertEquals(resource, Resource.newInstance(10 * 1024, 10));
} else if (interval.getStartTime() == 104L) {
Assert.assertTrue(interval.getEndTime() == 105L);
Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
} else if (interval.getStartTime() == 105L) {
Assert.assertTrue(interval.getEndTime() == 106L);
Assert.assertEquals(resource, Resource.newInstance(0 * 1024, 0));
} else if (interval.getStartTime() == 106L) {
Assert.assertTrue(interval.getEndTime() == 107L);
Assert.assertEquals(resource, Resource.newInstance(5 * 1024, 5));
} else {
private Map<ReservationInterval, Resource> generateAllocation( private Map<ReservationInterval, Resource> generateAllocation(
int startTime, int[] alloc, boolean isStep) { int startTime, int[] alloc, boolean isStep) {
Map<ReservationInterval, Resource> req = Map<ReservationInterval, Resource> req =

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;

View File

@ -0,0 +1,820 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
public class TestAlignedPlanner {
ReservationAgent agent;
InMemoryPlan plan;
Resource minAlloc = Resource.newInstance(1024, 1);
ResourceCalculator res = new DefaultResourceCalculator();
Resource maxAlloc = Resource.newInstance(1024 * 8, 8);
Random rand = new Random();
long step;
public void testSingleReservationAccept() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario1();
// Create reservation
ReservationDefinition rr1 =
5 * step, // Job arrival time
20 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(2048, 2), // Capability
10, // Num containers
5, // Concurrency
10 * step) }, // Duration
ReservationRequestInterpreter.R_ORDER, "u1");
// Add reservation
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
// Verify allocation
check(alloc1, 10 * step, 20 * step, 10, 2048, 2));
public void testOrderNoGapImpossible() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10L, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == numJobsInScenario);
public void testOrderNoGapImpossible2() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
13 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step), // Duration
Resource.newInstance(1024, 1), // Capability
10, // Num containers
10, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ORDER_NO_GAP, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == numJobsInScenario);
public void testOrderImpossible() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
2 * step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ORDER, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == numJobsInScenario);
public void testAnyImpossible() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
3 * step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ANY, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == numJobsInScenario);
public void testAnyAccept() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ANY, "u1");
// Add reservation
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
// Verify allocation
check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
public void testAllAccept() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Add reservation
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
// Verify allocation
check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
check(alloc1, 14 * step, 15 * step, 20, 1024, 1));
public void testAllImpossible() throws PlanningException {
// Prepare basic plan
int numJobsInScenario = initializeScenario2();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] {
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
step), // Duration
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == numJobsInScenario);
public void testUpdate() throws PlanningException {
// Create flexible reservation
ReservationDefinition rrFlex =
10 * step, // Job arrival time
14 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
100, // Num containers
1, // Concurrency
2 * step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Create blocking reservation
ReservationDefinition rrBlock =
10 * step, // Job arrival time
11 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
100, // Num containers
100, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Create reservation IDs
ReservationId flexReservationID =
ReservationId blockReservationID =
// Add block, add flex, remove block, update flex
agent.createReservation(blockReservationID, "uBlock", plan, rrBlock);
agent.createReservation(flexReservationID, "uFlex", plan, rrFlex);
agent.deleteReservation(blockReservationID, "uBlock", plan);
agent.updateReservation(flexReservationID, "uFlex", plan, rrFlex);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", flexReservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(flexReservationID);
// Verify allocation
check(alloc1, 10 * step, 14 * step, 50, 1024, 1));
public void testImpossibleDuration() throws PlanningException {
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
15 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
20, // Num containers
20, // Concurrency
10 * step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Add reservation
try {
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
} catch (PlanningException e) {
// Expected failure
// CHECK: allocation was not accepted
assertTrue("Agent-based allocation should have failed", plan
.getAllReservations().size() == 0);
public void testLoadedDurationIntervals() throws PlanningException {
int numJobsInScenario = initializeScenario3();
// Create reservation
ReservationDefinition rr1 =
10 * step, // Job arrival time
13 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
80, // Num containers
10, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Add reservation
ReservationId reservationID =
agent.createReservation(reservationID, "u1", plan, rr1);
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", reservationID != null);
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == numJobsInScenario + 1);
// Get reservation
ReservationAllocation alloc1 = plan.getReservationById(reservationID);
// Verify allocation
check(alloc1, 10 * step, 11 * step, 20, 1024, 1));
check(alloc1, 11 * step, 12 * step, 20, 1024, 1));
check(alloc1, 12 * step, 13 * step, 40, 1024, 1));
public void testCostFunction() throws PlanningException {
// Create large memory reservation
ReservationDefinition rr7Mem1Core =
10 * step, // Job arrival time
11 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(7 * 1024, 1),// Capability
1, // Num containers
1, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1");
// Create reservation
ReservationDefinition rr6Mem6Cores =
10 * step, // Job arrival time
11 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(6 * 1024, 6),// Capability
1, // Num containers
1, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u2");
// Create reservation
ReservationDefinition rr =
10 * step, // Job arrival time
12 * step, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
1, // Num containers
1, // Concurrency
step) }, // Duration
ReservationRequestInterpreter.R_ALL, "u3");
// Create reservation IDs
ReservationId reservationID1 =
ReservationId reservationID2 =
ReservationId reservationID3 =
// Add all
agent.createReservation(reservationID1, "u1", plan, rr7Mem1Core);
agent.createReservation(reservationID2, "u2", plan, rr6Mem6Cores);
agent.createReservation(reservationID3, "u3", plan, rr);
// Get reservation
ReservationAllocation alloc3 = plan.getReservationById(reservationID3);
check(alloc3, 10 * step, 11 * step, 0, 1024, 1));
check(alloc3, 11 * step, 12 * step, 1, 1024, 1));
public void testFromCluster() throws PlanningException {
// int numJobsInScenario = initializeScenario3();
List<ReservationDefinition> list = new ArrayList<ReservationDefinition>();
// Create reservation
1425716392178L, // Job arrival time
1425722262791L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
7, // Num containers
1, // Concurrency
587000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u1"));
1425716406178L, // Job arrival time
1425721255841L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
6, // Num containers
1, // Concurrency
485000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u2"));
1425716399178L, // Job arrival time
1425723780138L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
6, // Num containers
1, // Concurrency
738000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u3"));
1425716437178L, // Job arrival time
1425722968378L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
7, // Num containers
1, // Concurrency
653000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u4"));
1425716406178L, // Job arrival time
1425721926090L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
6, // Num containers
1, // Concurrency
552000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u5"));
1425716379178L, // Job arrival time
1425722238553L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
6, // Num containers
1, // Concurrency
586000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u6"));
1425716407178L, // Job arrival time
1425722908317L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
7, // Num containers
1, // Concurrency
650000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u7"));
1425716452178L, // Job arrival time
1425722841562L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
6, // Num containers
1, // Concurrency
639000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u8"));
1425716384178L, // Job arrival time
1425721766129L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
7, // Num containers
1, // Concurrency
538000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u9"));
1425716437178L, // Job arrival time
1425722507886L, // Job deadline
new ReservationRequest[] { ReservationRequest.newInstance(
Resource.newInstance(1024, 1), // Capability
5, // Num containers
1, // Concurrency
607000) }, // Duration
ReservationRequestInterpreter.R_ALL, "u10"));
// Add reservation
int i = 1;
for (ReservationDefinition rr : list) {
ReservationId reservationID =
agent.createReservation(reservationID, "u" + Integer.toString(i), plan,
// CHECK: allocation was accepted
assertTrue("Agent-based allocation failed", plan.getAllReservations()
.size() == list.size());
public void setup() throws Exception {
// Initialize random seed
long seed = rand.nextLong();
Log.info("Running with seed: " + seed);
// Set cluster parameters
long timeWindow = 1000000L;
int capacityMem = 100 * 1024;
int capacityCores = 100;
step = 60000L;
Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores);
// Set configuration
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
String reservationQ = testUtil.getFullReservationQueueName();
float instConstraint = 100;
float avgConstraint = 100;
ReservationSchedulerConfiguration conf =
ReservationSystemTestUtil.createConf(reservationQ, timeWindow,
instConstraint, avgConstraint);
CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
policy.init(reservationQ, conf);
QueueMetrics queueMetrics = mock(QueueMetrics.class);
// Set planning agent
agent = new AlignedPlannerWithGreedy();
// Create Plan
plan =
new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step,
res, minAlloc, maxAlloc, "dedicated", null, true);
private int initializeScenario1() throws PlanningException {
// insert in the reservation a couple of controlled reservations, to create
// conditions for assignment that are non-empty
addFixedAllocation(0L, step, new int[] { 10, 10, 20, 20, 20, 10, 10 });
System.out.println("--------BEFORE AGENT----------");
return 1;
private int initializeScenario2() throws PlanningException {
// insert in the reservation a couple of controlled reservations, to create
// conditions for assignment that are non-empty
addFixedAllocation(11 * step, step, new int[] { 90, 90, 90 });
System.out.println("--------BEFORE AGENT----------");
return 1;
private int initializeScenario3() throws PlanningException {
// insert in the reservation a couple of controlled reservations, to create
// conditions for assignment that are non-empty
addFixedAllocation(10 * step, step, new int[] { 70, 80, 60 });
System.out.println("--------BEFORE AGENT----------");
return 1;
private void addFixedAllocation(long start, long step, int[] f)
throws PlanningException {
plan.addReservation(new InMemoryReservationAllocation(
ReservationSystemTestUtil.getNewReservationId(), null,
"user_fixed", "dedicated", start, start + f.length * step,
ReservationSystemTestUtil.generateAllocation(start, step, f), res,
private ReservationDefinition createReservationDefinition(long arrival,
long deadline, ReservationRequest[] reservationRequests,
ReservationRequestInterpreter rType, String username) {
return ReservationDefinition.newInstance(arrival, deadline,
rType), username);
private boolean check(ReservationAllocation alloc, long start, long end,
int containers, int mem, int cores) {
Resource expectedResources =
Resource.newInstance(mem * containers, cores * containers);
// Verify that all allocations in [start,end) equal containers * (mem,cores)
for (long i = start; i < end; i++) {
if (!Resources.equals(alloc.getResourcesAtTime(i), expectedResources)) {
return false;
return true;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -37,6 +37,13 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*******************************************************************************/ *******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -30,6 +30,14 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;